You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/08/22 16:39:33 UTC
[14/14] geode git commit: GEODE-3416: Reduce synchronization
blockages in SocketCloser. Remove synchronization blocks around HashMap.
Replace that implementation with simpler ThreadPool that is not unbounded and
does not grow as the number of remoteAddre
GEODE-3416: Reduce synchronization blockages in SocketCloser.
Remove synchronization blocks around HashMap. Replace that implementation
with simpler ThreadPool that is not unbounded and does not grow as the
number of remoteAddress (clients/peers) are added
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/26dcf436
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/26dcf436
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/26dcf436
Branch: refs/heads/feature/GEODE-3416
Commit: 26dcf43647117c1729cc7a7f010de2774785c183
Parents: d11e1b9
Author: Dave Barnes <db...@pivotal.io>
Authored: Thu Aug 10 17:11:50 2017 -0700
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Tue Aug 22 09:38:51 2017 -0700
----------------------------------------------------------------------
.../cache/tier/sockets/CacheClientProxy.java | 47 +-----
.../apache/geode/internal/net/SocketCloser.java | 165 +++++++++++--------
.../internal/net/SocketCloserJUnitTest.java | 155 ++++++-----------
3 files changed, 141 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/26dcf436/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7e3548..34f232d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -181,11 +181,7 @@ public class CacheClientProxy implements ClientSession {
* True if we are connected to a client.
*/
private volatile boolean connected = false;
- // /**
- // * A string representing interest in all keys
- // */
- // protected static final String ALL_KEYS = "ALL_KEYS";
- //
+
/**
* True if a marker message is still in the ha queue.
*/
@@ -459,47 +455,6 @@ public class CacheClientProxy implements ClientSession {
return this.proxyID;
}
- // the following code was commented out simply because it was not used
- // /**
- // * Determines if the proxy represents the client host (and only the host, not
- // * necessarily the exact VM running on the host)
- // *
- // * @return Whether the proxy represents the client host
- // */
- // protected boolean representsClientHost(String clientHost)
- // {
- // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
- // return this._remoteHostAddress.equals(clientHost);
- // }
-
- // protected boolean representsClientVM(DistributedMember remoteMember)
- // {
- // // logger.warn("Is input port " + clientPort + " contained in " +
- // // logger.warn("Does input host " + clientHost + " equal " +
- // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
- // // logger.warn("representsClientVM: " +
- // // (representsClientHost(clientHost) && containsPort(clientPort)));
- // return (proxyID.getDistributedMember().equals(remoteMember));
- // }
-
- // /**
- // * Determines if the CacheClientUpdater proxied by this instance is listening
- // * on the input clientHost and clientPort
- // *
- // * @param clientHost
- // * The host name of the client to compare
- // * @param clientPort
- // * The port number of the client to compare
- // *
- // * @return Whether the CacheClientUpdater proxied by this instance is
- // * listening on the input clientHost and clientPort
- // */
- // protected boolean representsCacheClientUpdater(String clientHost,
- // int clientPort)
- // {
- // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
- // }
-
protected boolean isMember(ClientProxyMembershipID memberId) {
return this.proxyID.equals(memberId);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/26dcf436/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 6d86fd8..0a9a903 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -14,11 +14,17 @@
*/
package org.apache.geode.internal.net;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
+import org.apache.logging.log4j.Logger;
+
import java.io.IOException;
import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -26,12 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingThreadGroup;
-
/**
* This class allows sockets to be closed without blocking. In some cases we have seen a call of
* socket.close block for minutes. This class maintains a thread pool for every other member we have
@@ -57,7 +57,7 @@ public class SocketCloser {
* will queue up waiting for a thread.
*/
static final int ASYNC_CLOSE_POOL_MAX_THREADS =
- Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+ Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 4).intValue();
/**
* How many milliseconds the synchronous requester waits for the async close to happen. Default is
* 0. Prior releases waited 50ms.
@@ -66,13 +66,16 @@ public class SocketCloser {
Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
- /** map of thread pools of async close threads */
- private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
+ /**
+ * map of thread pools of async close threads
+ */
+ private final ConcurrentHashMap<String, ExecutorService> asyncCloseExecutors =
+ new ConcurrentHashMap<>();
private final long asyncClosePoolKeepAliveSeconds;
private final int asyncClosePoolMaxThreads;
private final long asyncCloseWaitTime;
private final TimeUnit asyncCloseWaitUnits;
- private boolean closed;
+ private Boolean closed = Boolean.FALSE;
public SocketCloser() {
this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -96,26 +99,47 @@ public class SocketCloser {
return this.asyncClosePoolMaxThreads;
}
- private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
- synchronized (asyncCloseExecutors) {
- ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
- if (pool == null) {
- final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
- ThreadFactory tf = new ThreadFactory() {
- public Thread newThread(final Runnable command) {
- Thread thread = new Thread(tg, command);
- thread.setDaemon(true);
- return thread;
- }
- };
- BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
- pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
- this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
- pool.allowCoreThreadTimeOut(true);
- asyncCloseExecutors.put(address, pool);
+ private ExecutorService getAsyncThreadExecutor(String address) {
+ ExecutorService executorService = asyncCloseExecutors.get(address);
+ if (executorService == null) {
+ // To be used for pre-1.8 jdk releases.
+ // executorService = createThreadPoolExecutor();
+
+ executorService = getWorkStealingPool(asyncClosePoolMaxThreads);
+
+ ExecutorService previousThreadPoolExecutor =
+ asyncCloseExecutors.putIfAbsent(address, executorService);
+
+ if (previousThreadPoolExecutor != null) {
+ executorService.shutdownNow();
+ return previousThreadPoolExecutor;
}
- return pool;
}
+ return executorService;
+ }
+
+ private ExecutorService getWorkStealingPool(int maxParallelThreads) {
+ return Executors.newWorkStealingPool(maxParallelThreads);
+ }
+
+ /**
+ * @deprecated since GEODE 1.3.0. Use @link{getWorkStealingPool}
+ */
+ @Deprecated
+ private ExecutorService createThreadPoolExecutor() {
+ final ThreadGroup threadGroup =
+ LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+ ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(final Runnable command) {
+ Thread thread = new Thread(threadGroup, command);
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+
+ return new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
+ asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+ threadFactory);
}
/**
@@ -123,19 +147,11 @@ public class SocketCloser {
* longer needed. Currently a thread pool is kept for each address and if you know that an address
* no longer needs its pool then you should call this method.
*/
- public void releaseResourcesForAddress(String address) {
- synchronized (asyncCloseExecutors) {
- ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
- if (pool != null) {
- pool.shutdown();
- asyncCloseExecutors.remove(address);
- }
- }
- }
- private boolean isClosed() {
- synchronized (asyncCloseExecutors) {
- return this.closed;
+ public void releaseResourcesForAddress(String address) {
+ ExecutorService executorService = asyncCloseExecutors.remove(address);
+ if (executorService != null) {
+ executorService.shutdown();
}
}
@@ -144,35 +160,22 @@ public class SocketCloser {
* called then the asyncClose will be done synchronously.
*/
public void close() {
- synchronized (asyncCloseExecutors) {
+ synchronized (closed) {
if (!this.closed) {
this.closed = true;
- for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
- pool.shutdown();
- }
- asyncCloseExecutors.clear();
+ } else {
+ return;
}
}
+ for (ExecutorService executorService : asyncCloseExecutors.values()) {
+ executorService.shutdown();
+ }
+ asyncCloseExecutors.clear();
}
- private void asyncExecute(String address, Runnable r) {
- // Waiting 50ms for the async close request to complete is what the old (close per thread)
- // code did. But now that we will not create a thread for every close request
- // it seems better to let the thread that requested the close to move on quickly.
- // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
- // can be set to how many milliseconds to wait.
- if (this.asyncCloseWaitTime == 0) {
- getAsyncThreadExecutor(address).execute(r);
- } else {
- Future<?> future = getAsyncThreadExecutor(address).submit(r);
- try {
- future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- // We want this code to wait at most 50ms for the close to happen.
- // It is ok to ignore these exception and let the close continue
- // in the background.
- }
- }
+ private Future asyncExecute(String address, Runnable runnableToExecute) {
+ ExecutorService asyncThreadExecutor = getAsyncThreadExecutor(address);
+ return asyncThreadExecutor.submit(runnableToExecute);
}
/**
@@ -181,29 +184,30 @@ public class SocketCloser {
* this method may block for a certain amount of time. If it is called after the SocketCloser is
* closed then a normal synchronous close is done.
*
- * @param sock the socket to close
+ * @param socket the socket to close
* @param address identifies who the socket is connected to
* @param extra an optional Runnable with stuff to execute in the async thread
*/
- public void asyncClose(final Socket sock, final String address, final Runnable extra) {
- if (sock == null || sock.isClosed()) {
+ public void asyncClose(final Socket socket, final String address, final Runnable extra) {
+ if (socket == null || socket.isClosed()) {
return;
}
boolean doItInline = false;
try {
- synchronized (asyncCloseExecutors) {
- if (isClosed()) {
+ Future submittedTask = null;
+ synchronized (closed) {
+ if (closed) {
// this SocketCloser has been closed so do a synchronous, inline, close
doItInline = true;
} else {
- asyncExecute(address, new Runnable() {
+ submittedTask = asyncExecute(address, new Runnable() {
public void run() {
Thread.currentThread().setName("AsyncSocketCloser for " + address);
try {
if (extra != null) {
extra.run();
}
- inlineClose(sock);
+ inlineClose(socket);
} finally {
Thread.currentThread().setName("unused AsyncSocketCloser");
}
@@ -211,6 +215,9 @@ public class SocketCloser {
});
}
}
+ if (submittedTask != null) {
+ waitForFutureTaskWithTimeout(submittedTask);
+ }
} catch (OutOfMemoryError ignore) {
// If we can't start a thread to close the socket just do it inline.
// See bug 50573.
@@ -220,16 +227,28 @@ public class SocketCloser {
if (extra != null) {
extra.run();
}
- inlineClose(sock);
+ inlineClose(socket);
}
}
+ private void waitForFutureTaskWithTimeout(Future submittedTask) {
+ if (this.asyncCloseWaitTime != 0) {
+ try {
+ submittedTask.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ // We want this code to wait at most the asyncCloseWaitTime for the close to happen.
+ // It is ok to ignore these exception and let the close continue
+ // in the background.
+ }
+ }
+ }
/**
* Closes the specified socket
*
* @param sock the socket to close
*/
+
private static void inlineClose(final Socket sock) {
// the next two statements are a mad attempt to fix bug
// 36041 - segv in jrockit in pthread signaling code. This
http://git-wip-us.apache.org/repos/asf/geode/blob/26dcf436/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index 942cad4..a8b1d48 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -14,22 +14,21 @@
*/
package org.apache.geode.internal.net;
-import static org.junit.Assert.*;
-
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.UnitTest;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Tests the default SocketCloser.
@@ -62,86 +61,49 @@ public class SocketCloserJUnitTest {
*/
@Test
public void testAsync() {
- final CountDownLatch cdl = new CountDownLatch(1);
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicInteger waitingToClose = new AtomicInteger(0);
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- waitingToClose.incrementAndGet();
- cdl.await();
- } catch (InterruptedException e) {
- }
- }
- };
final int SOCKET_COUNT = 100;
- final Socket[] aSockets = new Socket[SOCKET_COUNT];
- for (int i = 0; i < SOCKET_COUNT; i++) {
- aSockets[i] = createClosableSocket();
- }
- // Schedule a 100 sockets for async close.
- // They should all be stuck on cdl.
- for (int i = 0; i < SOCKET_COUNT; i++) {
- this.socketCloser.asyncClose(aSockets[i], "A", r);
- }
- // Make sure the sockets have not been closed
- for (int i = 0; i < SOCKET_COUNT; i++) {
- assertEquals(false, aSockets[i].isClosed());
- }
- final Socket[] bSockets = new Socket[SOCKET_COUNT];
- for (int i = 0; i < SOCKET_COUNT; i++) {
- bSockets[i] = createClosableSocket();
- }
+ final int REMOTE_CLIENT_COUNT = 200;
+
+ List<Socket> trackedSockets = new ArrayList<>();
// Schedule a 100 sockets for async close.
- // They should all be stuck on cdl.
- for (int i = 0; i < SOCKET_COUNT; i++) {
- this.socketCloser.asyncClose(bSockets[i], "B", r);
- }
- // Make sure the sockets have not been closed
- for (int i = 0; i < SOCKET_COUNT; i++) {
- assertEquals(false, bSockets[i].isClosed());
+ // They should all be stuck on countDownLatch.
+ for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
+ Socket[] aSockets = new Socket[SOCKET_COUNT];
+ String address = i + "";
+ for (int j = 0; j < SOCKET_COUNT; j++) {
+ aSockets[j] = createClosableSocket();
+ trackedSockets.add(aSockets[j]);
+ this.socketCloser.asyncClose(aSockets[j], address, () -> {
+ try {
+ waitingToClose.incrementAndGet();
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ }
+ });
+ }
}
+
// close the socketCloser first to verify that the sockets
// that have already been scheduled will be still be closed.
- this.socketCloser.releaseResourcesForAddress("A");
this.socketCloser.close();
- // Each thread pool (one for A and one for B) has a max of 8 threads.
- // So verify that this many are currently waiting on cdl.
- {
- final int maxThreads = this.socketCloser.getMaxThreads();
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- return waitingToClose.get() == 2 * maxThreads;
- }
-
- public String description() {
- return "expected " + 2 * maxThreads + " waiters but found only " + waitingToClose.get();
- }
- };
- Wait.waitForCriterion(wc, 5000, 10, true);
- }
- // now count down the latch that allows the sockets to close
- cdl.countDown();
+ countDownLatch.countDown();
// now all the sockets should get closed; use a wait criteria
// since a thread pool is doing to closes
- {
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- for (int i = 0; i < SOCKET_COUNT; i++) {
- if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
- return false;
- }
- }
- return true;
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+ boolean areAllClosed = true;
+ for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext();) {
+ Socket socket = iterator.next();
+ if (socket.isClosed()) {
+ iterator.remove();
+ continue;
}
-
- public String description() {
- return "one or more sockets did not close";
- }
- };
- Wait.waitForCriterion(wc, 5000, 10, true);
- }
+ areAllClosed = false;
+ }
+ return areAllClosed;
+ });
}
/**
@@ -150,18 +112,11 @@ public class SocketCloserJUnitTest {
@Test
public void testClosedSocket() throws Exception {
final AtomicBoolean runnableCalled = new AtomicBoolean();
- Runnable r = new Runnable() {
- @Override
- public void run() {
- runnableCalled.set(true);
- }
- };
Socket s = createClosableSocket();
s.close();
- this.socketCloser.asyncClose(s, "A", r);
- Wait.pause(10);
- assertEquals(false, runnableCalled.get());
+ this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true));
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !runnableCalled.get());
}
/**
@@ -170,25 +125,11 @@ public class SocketCloserJUnitTest {
@Test
public void testClosedSocketCloser() {
final AtomicBoolean runnableCalled = new AtomicBoolean();
- Runnable r = new Runnable() {
- @Override
- public void run() {
- runnableCalled.set(true);
- }
- };
- final Socket s = createClosableSocket();
+ final Socket closableSocket = createClosableSocket();
this.socketCloser.close();
- this.socketCloser.asyncClose(s, "A", r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- return runnableCalled.get() && s.isClosed();
- }
-
- public String description() {
- return "runnable was not called or socket was not closed";
- }
- };
- Wait.waitForCriterion(wc, 5000, 10, true);
+ this.socketCloser.asyncClose(closableSocket, "A", () -> runnableCalled.set(true));
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> runnableCalled.get() && closableSocket.isClosed());
}
}