You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/08/11 16:51:55 UTC
[2/2] geode git commit: GEODE-3416: Cleanup SocketCloser code to
reduce the synchronization Old code would force single threaded behavior,
new code will synchronize on the checking or changing or the closed flag
GEODE-3416: Cleanup SocketCloser code to reduce the synchronization
Old code would force single threaded behavior, new code will synchronize
on the checking or changing or the closed flag
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/821e03dc
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/821e03dc
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/821e03dc
Branch: refs/heads/feature/GEODE-3416
Commit: 821e03dc6f45a6a9a4c43045dfbb235a626c2ec7
Parents: ea2fc82
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Fri Aug 11 09:45:45 2017 -0700
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Fri Aug 11 09:51:34 2017 -0700
----------------------------------------------------------------------
.../cache/tier/sockets/CacheClientProxy.java | 4 +-
.../apache/geode/internal/net/SocketCloser.java | 192 +++++++++++++------
.../apache/geode/internal/tcp/Connection.java | 4 +-
.../internal/net/SocketCloserJUnitTest.java | 10 +-
4 files changed, 140 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 98bfed9..34f232d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -949,7 +949,8 @@ public class CacheClientProxy implements ClientSession {
private void closeSocket() {
if (this._socketClosed.compareAndSet(false, true)) {
// Close the socket
- this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, null);
+ this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
+ null);
getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
}
}
@@ -963,6 +964,7 @@ public class CacheClientProxy implements ClientSession {
{
String remoteHostAddress = this._remoteHostAddress;
if (remoteHostAddress != null) {
+ this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
this._remoteHostAddress = null;
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index fbbe797..46d69a8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -21,8 +21,10 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.Socket;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -49,27 +51,32 @@ public class SocketCloser {
* minutes).
*/
static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS =
- Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120);
+ Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
/**
* Maximum number of threads that can be doing a socket close. Any close requests over this max
* will queue up waiting for a thread.
*/
- private static final int ASYNC_CLOSE_POOL_MAX_THREADS =
- Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8);
+ static final int ASYNC_CLOSE_POOL_MAX_THREADS =
+ Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 4).intValue();
/**
* How many milliseconds the synchronous requester waits for the async close to happen. Default is
* 0. Prior releases waited 50ms.
*/
- private static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
- Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0);
+ static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
+ Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
+ /**
+ * map of thread pools of async close threads
+ */
+ private final ConcurrentHashMap<String, ExecutorService>
+ asyncCloseExecutors =
+ new ConcurrentHashMap<>();
private final long asyncClosePoolKeepAliveSeconds;
private final int asyncClosePoolMaxThreads;
private final long asyncCloseWaitTime;
private final TimeUnit asyncCloseWaitUnits;
- private boolean closed;
- private final ExecutorService socketCloserThreadPool;
+ private Boolean closed = Boolean.FALSE;
public SocketCloser() {
this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -82,30 +89,71 @@ public class SocketCloser {
}
public SocketCloser(long asyncClosePoolKeepAliveSeconds, int asyncClosePoolMaxThreads,
- long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
+ long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
this.asyncCloseWaitTime = asyncCloseWaitTime;
this.asyncCloseWaitUnits = asyncCloseWaitUnits;
-
- final ThreadGroup threadGroup =
- LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
- ThreadFactory threadFactory = command -> {
- Thread thread = new Thread(threadGroup, command);
- thread.setDaemon(true);
- return thread;
- };
- socketCloserThreadPool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads,
- this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(), threadFactory);
}
public int getMaxThreads() {
return this.asyncClosePoolMaxThreads;
}
- private boolean isClosed() {
- return this.closed;
+ private ExecutorService getAsyncThreadExecutor(String address) {
+ ExecutorService executorService = asyncCloseExecutors.get(address);
+ if (executorService == null) {
+ //To be used for pre-1.8 jdk releases.
+// createThreadPool();
+
+ executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
+
+ ExecutorService
+ previousThreadPoolExecutor =
+ asyncCloseExecutors.putIfAbsent(address, executorService);
+
+ if (previousThreadPoolExecutor != null) {
+ executorService.shutdownNow();
+ return previousThreadPoolExecutor;
+ }
+ }
+ return executorService;
+ }
+
+ /**
+ * @deprecated this method is to be used for pre 1.8 jdk.
+ */
+ @Deprecated
+ private void createThreadPool() {
+ ExecutorService executorService;
+ final ThreadGroup
+ threadGroup =
+ LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+ ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(final Runnable command) {
+ Thread thread = new Thread(threadGroup, command);
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+
+ executorService =
+ new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
+ asyncCloseWaitTime,
+ asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
+ }
+
+ /**
+ * Call this method if you know all the resources in the closer for the given address are no
+ * longer needed. Currently a thread pool is kept for each address and if you know that an address
+ * no longer needs its pool then you should call this method.
+ */
+
+ public void releaseResourcesForAddress(String address) {
+ ExecutorService executorService = asyncCloseExecutors.remove(address);
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
/**
@@ -113,84 +161,104 @@ public class SocketCloser {
* called then the asyncClose will be done synchronously.
*/
public void close() {
- if (!this.closed) {
- this.closed = true;
- socketCloserThreadPool.shutdown();
+ synchronized (closed) {
+ if (!this.closed) {
+ this.closed = true;
+ } else {
+ return;
+ }
+ }
+ for (ExecutorService executorService : asyncCloseExecutors.values()) {
+ executorService.shutdown();
+ asyncCloseExecutors.clear();
}
}
+ private Future asyncExecute(String address, Runnable runnableToExecute) {
+ ExecutorService asyncThreadExecutor = getAsyncThreadExecutor(address);
+ return asyncThreadExecutor.submit(runnableToExecute);
+ }
+
/**
* Closes the specified socket in a background thread. In some cases we see close hang (see bug
* 33665). Depending on how the SocketCloser is configured (see ASYNC_CLOSE_WAIT_MILLISECONDS)
* this method may block for a certain amount of time. If it is called after the SocketCloser is
* closed then a normal synchronous close is done.
- *
- * @param socket the socket to close
- * @param runnableCode an optional Runnable with stuff to execute in the async thread
+ * @param sock the socket to close
+ * @param address identifies who the socket is connected to
+ * @param extra an optional Runnable with stuff to execute in the async thread
*/
- public void asyncClose(final Socket socket, final Runnable runnableCode) {
- if (socket == null || socket.isClosed()) {
+ public void asyncClose(final Socket sock, final String address, final Runnable extra) {
+ if (sock == null || sock.isClosed()) {
return;
}
-
boolean doItInline = false;
try {
- if (isClosed()) {
- // this SocketCloser has been closed so do a synchronous, inline, close
- doItInline = true;
- } else {
- socketCloserThreadPool.execute(() -> {
- if (runnableCode != null) {
- runnableCode.run();
- }
- inlineClose(socket);
- });
- if (this.asyncCloseWaitTime != 0) {
- try {
- Future future = socketCloserThreadPool.submit(() -> {
- if (runnableCode != null) {
- runnableCode.run();
+ Future submittedTask = null;
+ synchronized (closed) {
+ if (closed) {
+ // this SocketCloser has been closed so do a synchronous, inline, close
+ doItInline = true;
+ } else {
+ submittedTask = asyncExecute(address, new Runnable() {
+ public void run() {
+ Thread.currentThread().setName("AsyncSocketCloser for " + address);
+ try {
+ if (extra != null) {
+ extra.run();
+ }
+ inlineClose(sock);
+ } finally {
+ Thread.currentThread().setName("unused AsyncSocketCloser");
}
- inlineClose(socket);
- });
- future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- // We want this code to wait at most 50ms for the close to happen.
- // It is ok to ignore these exception and let the close continue
- // in the background.
- }
+ }
+ });
}
}
+ if (submittedTask != null) {
+ waitForFutureTaskWithTimeout(submittedTask);
+ }
} catch (OutOfMemoryError ignore) {
// If we can't start a thread to close the socket just do it inline.
// See bug 50573.
doItInline = true;
}
if (doItInline) {
- if (runnableCode != null) {
- runnableCode.run();
+ if (extra != null) {
+ extra.run();
}
- inlineClose(socket);
+ inlineClose(sock);
}
}
+ private void waitForFutureTaskWithTimeout(Future submittedTask) {
+ if (this.asyncCloseWaitTime != 0) {
+ try {
+ submittedTask.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ // We want this code to wait at most 50ms for the close to happen.
+ // It is ok to ignore these exception and let the close continue
+ // in the background.
+ }
+ }
+ }
/**
* Closes the specified socket
- *
- * @param socket the socket to close
+ * @param sock the socket to close
*/
- private void inlineClose(final Socket socket) {
+
+ private static void inlineClose(final Socket sock) {
// the next two statements are a mad attempt to fix bug
// 36041 - segv in jrockit in pthread signaling code. This
// seems to alleviate the problem.
try {
- socket.shutdownInput();
- socket.shutdownOutput();
+ sock.shutdownInput();
+ sock.shutdownOutput();
} catch (Exception e) {
}
try {
- socket.close();
+ sock.close();
} catch (IOException ignore) {
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 954a33c..0ecb3bf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -562,7 +562,7 @@ public class Connection implements Runnable {
} catch (IOException io) {
logger.fatal(LocalizedMessage
.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
- t.getSocketCloser().asyncClose(socket, null);
+ t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), null);
throw io;
}
}
@@ -847,7 +847,7 @@ public class Connection implements Runnable {
Socket s = this.socket;
if (s != null && !s.isClosed()) {
prepareForAsyncClose();
- this.owner.getSocketCloser().asyncClose(s, null);
+ this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index b6dbfe2..90315ce 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -72,11 +72,11 @@ public class SocketCloserJUnitTest {
// They should all be stuck on countDownLatch.
for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
Socket[] aSockets = new Socket[SOCKET_COUNT];
-
+ String address = i + "";
for (int j = 0; j < SOCKET_COUNT; j++) {
aSockets[j] = createClosableSocket();
trackedSockets.add(aSockets[j]);
- this.socketCloser.asyncClose(aSockets[j], () -> {
+ this.socketCloser.asyncClose(aSockets[j], address, () -> {
try {
waitingToClose.incrementAndGet();
countDownLatch.await();
@@ -94,7 +94,7 @@ public class SocketCloserJUnitTest {
// since a thread pool is doing to closes
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
boolean areAllClosed = true;
- for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext();) {
+ for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext(); ) {
Socket socket = iterator.next();
if (socket.isClosed()) {
iterator.remove();
@@ -115,7 +115,7 @@ public class SocketCloserJUnitTest {
Socket s = createClosableSocket();
s.close();
- this.socketCloser.asyncClose(s, () -> runnableCalled.set(true));
+ this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true));
Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !runnableCalled.get());
}
@@ -128,7 +128,7 @@ public class SocketCloserJUnitTest {
final Socket closableSocket = createClosableSocket();
this.socketCloser.close();
- this.socketCloser.asyncClose(closableSocket, () -> runnableCalled.set(true));
+ this.socketCloser.asyncClose(closableSocket, "A", () -> runnableCalled.set(true));
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> runnableCalled.get() && closableSocket.isClosed());
}