You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/09/16 01:47:54 UTC
[1/2] incubator-geode git commit: p2p readers and handshakers threads
are now pooled
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-332 [created] 6d6c760cc
p2p readers and handshakers threads are now pooled
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fa017689
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fa017689
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fa017689
Branch: refs/heads/feature/GEODE-332
Commit: fa017689ef73d9003eade82c6cb27634c03be05f
Parents: 4e65f0c
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Sep 15 16:03:40 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Sep 15 16:03:40 2015 -0700
----------------------------------------------------------------------
.../gemfire/internal/tcp/Connection.java | 58 +++++++++++--------
.../gemfire/internal/tcp/ConnectionTable.java | 61 +++++++++++++++++++-
2 files changed, 93 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fa017689/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index cd1b7dc..630ecfe 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -115,6 +115,11 @@ public class Connection implements Runnable {
/** the table holding this connection */
final ConnectionTable owner;
+
+ /** Set to false once run() is terminating. Using this instead of Thread.isAlive
+ * as the reader thread may be a pooled thread.
+ */
+ private volatile boolean isRunning = false;
/** true if connection is a shared resource that can be used by more than one thread */
private boolean sharedResource;
@@ -136,11 +141,14 @@ public class Connection implements Runnable {
}
private final static ThreadLocal isReaderThread = new ThreadLocal();
- // return true if this thread is a reader thread
public final static void makeReaderThread() {
// mark this thread as a reader thread
- isReaderThread.set(Boolean.TRUE);
+ makeReaderThread(true);
}
+ private final static void makeReaderThread(boolean v) {
+ isReaderThread.set(v);
+ }
+ // return true if this thread is a reader thread
public final static boolean isReaderThread() {
Object o = isReaderThread.get();
if (o == null) {
@@ -523,7 +531,7 @@ public class Connection implements Runnable {
Connection c = new Connection(t, s);
boolean readerStarted = false;
try {
- c.startReader();
+ c.startReader(t);
readerStarted = true;
} finally {
if (!readerStarted) {
@@ -822,11 +830,12 @@ public class Connection implements Runnable {
Runnable r = new Runnable() {
public void run() {
boolean rShuttingDown = readerShuttingDown;
+ final Thread localRef = readerThread;
synchronized(stateLock) {
- if (readerThread != null && readerThread.isAlive() &&
+ if (localRef != null && isRunning &&
!rShuttingDown && connectionState == STATE_READING
|| connectionState == STATE_READING_ACK) {
- readerThread.interrupt();
+ localRef.interrupt();
}
}
}
@@ -951,7 +960,7 @@ public class Connection implements Runnable {
*
* @throws IOException if handshake fails
*/
- private void attemptHandshake() throws IOException {
+ private void attemptHandshake(ConnectionTable connTable) throws IOException {
// send HANDSHAKE
// send this server's port. It's expected on the other side
if (useNIO()) {
@@ -961,7 +970,7 @@ public class Connection implements Runnable {
handshakeStream();
}
- startReader(); // this reader only reads the handshake and then exits
+ startReader(connTable); // this reader only reads the handshake and then exits
waitForHandshake(); // waiting for reply
}
@@ -1099,7 +1108,7 @@ public class Connection implements Runnable {
if (conn != null) {
// handshake
try {
- conn.attemptHandshake();
+ conn.attemptHandshake(t);
if (conn.isSocketClosed()) {
// something went wrong while reading the handshake
// and the socket was closed or this guy sent us a
@@ -1601,14 +1610,14 @@ public class Connection implements Runnable {
this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
}
- if (!beingSick && this.readerThread != null && !isIBM && this.readerThread.isAlive()
+ if (!beingSick && this.readerThread != null && !isIBM && this.isRunning
&& this.readerThread != Thread.currentThread()) {
try {
this.readerThread.join(500);
- if (this.readerThread.isAlive() && !this.readerShuttingDown
+ if (this.isRunning && !this.readerShuttingDown
&& owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure
this.readerThread.join(1500);
- if (this.readerThread.isAlive()) {
+ if (this.isRunning) {
logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this));
}
}
@@ -1677,26 +1686,22 @@ public class Connection implements Runnable {
}
/** starts a reader thread */
- private void startReader() {
- ThreadGroup group =
- LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
- Assert.assertTrue(this.readerThread == null);
- this.readerThread =
- new Thread(group, this, p2pReaderName());
- this.readerThread.setDaemon(true);
- stopped = false;
- this.readerThread.start();
- }
+ private void startReader(ConnectionTable connTable) {
+ Assert.assertTrue(!this.isRunning);
+ stopped = false;
+ this.isRunning = true;
+ connTable.executeCommand(this);
+ }
/** in order to read non-NIO socket-based messages we need to have a thread
actively trying to grab bytes out of the sockets input queue.
This is that thread. */
public void run() {
+ this.readerThread = Thread.currentThread();
+ this.readerThread.setName(p2pReaderName());
ConnectionTable.threadWantsSharedResources();
- if (this.isReceiver) {
- makeReaderThread();
- }
+ makeReaderThread(this.isReceiver);
try {
if (useNIO()) {
runNioReader();
@@ -1725,6 +1730,9 @@ public class Connection implements Runnable {
// for the handshake.
// see bug 37524 for an example of listeners hung in waitForHandshake
notifyHandshakeWaiter(false);
+ this.isRunning = false;
+ this.readerThread.setName("idle p2p reader");
+ this.readerThread = null;
} // finally
}
@@ -3307,7 +3315,7 @@ public class Connection implements Runnable {
protected Object stateLock = new Object();
/** for timeout processing, this is the current state of the connection */
- protected byte connectionState;
+ protected byte connectionState = STATE_IDLE;
/*~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~*/
/** the connection is idle, but may be in use */
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fa017689/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 9beb947..525c687 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -19,8 +19,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
@@ -37,6 +44,7 @@ import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.SystemTimer;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
@@ -129,7 +137,13 @@ public class ConnectionTable {
*/
private volatile boolean closed = false;
-
+ /**
+ * Executor used by p2p reader and p2p handshaker threads.
+ */
+ private Executor p2pReaderThreadPool;
+ /** Number of seconds to wait before timing out an unused p2p reader thread. Default is 120 (2 minutes). */
+ private final static long READER_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue();
+
/**
* The most recent instance to be created
*
@@ -202,11 +216,40 @@ public class ConnectionTable {
this.threadOrderedConnMap = new ThreadLocal();
this.threadConnMaps = new ArrayList();
this.threadConnectionMap = new ConcurrentHashMap();
+ this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
/* NOMUX: if (TCPConduit.useNIO) {
inputMuxManager = new InputMuxManager(this);
inputMuxManager.start(c.logger);
}*/
}
+
+ private Executor createThreadPoolForIO(boolean conserveSockets) {
+ Executor executor = null;
+ final ThreadGroup connectionRWGroup = LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
+ if (conserveSockets) {
+ executor = new Executor() {
+ @Override
+ public void execute(Runnable command) {
+ Thread th = new Thread(connectionRWGroup, command);
+ th.setDaemon(true);
+ th.start();
+ }
+ };
+ }
+ else {
+ BlockingQueue synchronousQueue = new SynchronousQueue();
+ ThreadFactory tf = new ThreadFactory() {
+ public Thread newThread(final Runnable command) {
+ Thread thread = new Thread(connectionRWGroup, command);
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME,
+ TimeUnit.SECONDS, synchronousQueue, tf);
+ }
+ return executor;
+ }
/** conduit sends connected() after establishing the server socket */
// protected void connected() {
@@ -715,6 +758,15 @@ public class ConnectionTable {
this.threadConnMaps.clear();
}
}
+ {
+ Executor localExec = this.p2pReaderThreadPool;
+ if (localExec != null) {
+ if (localExec instanceof ExecutorService) {
+ ((ExecutorService)localExec).shutdown();
+ }
+ this.p2pReaderThreadPool = null;
+ }
+ }
closeReceivers(false);
Map m = (Map)this.threadOrderedConnMap.get();
@@ -726,6 +778,13 @@ public class ConnectionTable {
}
}
+ public void executeCommand(Runnable runnable) {
+ Executor local = this.p2pReaderThreadPool;
+ if (local != null) {
+ local.execute(runnable);
+ }
+ }
+
/**
* Close all receiving threads. This is used during shutdown and is also
* used by a test hook that makes us deaf to incoming messages.
[2/2] incubator-geode git commit: async close now done with thread
pool
Posted by ds...@apache.org.
async close now done with thread pool
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6d6c760c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6d6c760c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6d6c760c
Branch: refs/heads/feature/GEODE-332
Commit: 6d6c760ccb17d4bd0d683072f1d79dc895926c27
Parents: fa01768
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Sep 15 16:47:17 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Sep 15 16:47:17 2015 -0700
----------------------------------------------------------------------
.../gemfire/internal/SocketCreator.java | 69 +++++++++++++-------
.../gemfire/internal/tcp/ConnectionTable.java | 1 +
2 files changed, 48 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6d6c760c/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index ff4a22c..0688c3d 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -81,6 +81,11 @@ import com.gemstone.gemfire.internal.util.PasswordUtil;
import com.gemstone.org.jgroups.util.ConnectionWatcher;
import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.*;
@@ -1197,6 +1202,40 @@ public class SocketCreator implements com.gemstone.org.jgroups.util.SockCreator
return (String[]) v.toArray( new String[ v.size() ] );
}
+ /** thread pool of async close threads */
+ private static ThreadPoolExecutor asyncCloseExecutor;
+ /** Number of seconds to wait before timing out an unused async close thread. Default is 120 (2 minutes). */
+ private final static long ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME", 120).longValue();
+
+ private static synchronized ThreadPoolExecutor getAsyncThreadExecutor() {
+ ThreadPoolExecutor pool = asyncCloseExecutor;
+ if (pool == null) {
+ final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+ ThreadFactory tf = new ThreadFactory() {
+ public Thread newThread(final Runnable command) {
+ Thread thread = new Thread(tg, command);
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ BlockingQueue synchronousQueue = new SynchronousQueue();
+ pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS, synchronousQueue, tf);
+ asyncCloseExecutor = pool;
+ }
+ return pool;
+ }
+ public static synchronized void closeAsyncThreadExecutor() {
+ ThreadPoolExecutor pool = asyncCloseExecutor;
+ if (pool != null) {
+ pool.shutdownNow();
+ asyncCloseExecutor = null;
+ }
+ }
+ private static synchronized void asyncExecute(Runnable r) {
+ // The old code waited 50ms for the async task to complete.
+ // Should this code use submit on the executor and also wait 50ms?
+ getAsyncThreadExecutor().execute(r);
+ }
/**
* Closes the specified socket in a background thread and waits a limited
* amount of time for the close to complete. In some cases we see close
@@ -1206,44 +1245,30 @@ public class SocketCreator implements com.gemstone.org.jgroups.util.SockCreator
* @param who who the socket is connected to
* @param extra an optional Runnable with stuff to execute in the async thread
*/
- public static void asyncClose(final Socket sock, String who, final Runnable extra) {
+ public static void asyncClose(final Socket sock, final String who, final Runnable extra) {
if (sock == null || sock.isClosed()) {
return;
}
try {
- ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
-
- Thread t = new Thread(tg, new Runnable() {
+ asyncExecute(new Runnable() {
public void run() {
+ Thread.currentThread().setName("AsyncSocketCloser for " + who);
+ try {
if (extra != null) {
extra.run();
}
inlineClose(sock);
+ } finally {
+ Thread.currentThread().setName("unused AsyncSocketCloser");
+ }
}
- }, "AsyncSocketCloser for " + who);
- t.setDaemon(true);
- try {
- t.start();
+ });
} catch (OutOfMemoryError ignore) {
// If we can't start a thread to close the socket just do it inline.
// See bug 50573.
inlineClose(sock);
return;
}
- try {
- // [bruce] if the network fails, this will wait the full amount of time
- // on every close, so it must be kept very short. it was 750ms before,
- // causing frequent hangs in net-down hydra tests
- t.join(50/*ms*/);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- // NOTREACHED
- throw e;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6d6c760c/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 525c687..84ea1eb 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -776,6 +776,7 @@ public class ConnectionTable {
m.clear();
}
}
+ SocketCreator.closeAsyncThreadExecutor();
}
public void executeCommand(Runnable runnable) {