You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/08/09 21:21:22 UTC
geode git commit: GEODE-3416: Reduce synchronization blockages in
SocketCloser. Remove synchronization blocks around HashMap. Replace that
implementation with simpler ThreadPool that is not unbounded and does not
grow as the number of remoteAddress (clie [Forced Update!]
Repository: geode
Updated Branches:
refs/heads/feature/GEODE-3416 6a0d81267 -> 6d899bdb4 (forced update)
GEODE-3416: Reduce synchronization blockages in SocketCloser.
Remove synchronization blocks around HashMap. Replace that implementation
with simpler ThreadPool that is not unbounded and does not grow as the
number of remoteAddress (clients/peers) are added
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/6d899bdb
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/6d899bdb
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/6d899bdb
Branch: refs/heads/feature/GEODE-3416
Commit: 6d899bdb45047a2d9d34c22fa544712f71fb7c3b
Parents: b06f69f
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Wed Aug 9 14:17:59 2017 -0700
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Wed Aug 9 14:21:03 2017 -0700
----------------------------------------------------------------------
.../cache/tier/sockets/CacheClientProxy.java | 51 +-----
.../apache/geode/internal/net/SocketCloser.java | 176 +++++++------------
.../apache/geode/internal/tcp/Connection.java | 4 +-
.../geode/internal/tcp/ConnectionTable.java | 4 -
.../internal/net/SocketCloserJUnitTest.java | 155 +++++-----------
5 files changed, 115 insertions(+), 275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/6d899bdb/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7e3548..98bfed9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -181,11 +181,7 @@ public class CacheClientProxy implements ClientSession {
* True if we are connected to a client.
*/
private volatile boolean connected = false;
- // /**
- // * A string representing interest in all keys
- // */
- // protected static final String ALL_KEYS = "ALL_KEYS";
- //
+
/**
* True if a marker message is still in the ha queue.
*/
@@ -459,47 +455,6 @@ public class CacheClientProxy implements ClientSession {
return this.proxyID;
}
- // the following code was commented out simply because it was not used
- // /**
- // * Determines if the proxy represents the client host (and only the host, not
- // * necessarily the exact VM running on the host)
- // *
- // * @return Whether the proxy represents the client host
- // */
- // protected boolean representsClientHost(String clientHost)
- // {
- // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
- // return this._remoteHostAddress.equals(clientHost);
- // }
-
- // protected boolean representsClientVM(DistributedMember remoteMember)
- // {
- // // logger.warn("Is input port " + clientPort + " contained in " +
- // // logger.warn("Does input host " + clientHost + " equal " +
- // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
- // // logger.warn("representsClientVM: " +
- // // (representsClientHost(clientHost) && containsPort(clientPort)));
- // return (proxyID.getDistributedMember().equals(remoteMember));
- // }
-
- // /**
- // * Determines if the CacheClientUpdater proxied by this instance is listening
- // * on the input clientHost and clientPort
- // *
- // * @param clientHost
- // * The host name of the client to compare
- // * @param clientPort
- // * The port number of the client to compare
- // *
- // * @return Whether the CacheClientUpdater proxied by this instance is
- // * listening on the input clientHost and clientPort
- // */
- // protected boolean representsCacheClientUpdater(String clientHost,
- // int clientPort)
- // {
- // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
- // }
-
protected boolean isMember(ClientProxyMembershipID memberId) {
return this.proxyID.equals(memberId);
}
@@ -994,8 +949,7 @@ public class CacheClientProxy implements ClientSession {
private void closeSocket() {
if (this._socketClosed.compareAndSet(false, true)) {
// Close the socket
- this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
- null);
+ this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, null);
getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
}
}
@@ -1009,7 +963,6 @@ public class CacheClientProxy implements ClientSession {
{
String remoteHostAddress = this._remoteHostAddress;
if (remoteHostAddress != null) {
- this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
this._remoteHostAddress = null;
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/6d899bdb/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 6d86fd8..fbbe797 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -14,11 +14,15 @@
*/
package org.apache.geode.internal.net;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
+import org.apache.logging.log4j.Logger;
+
import java.io.IOException;
import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -26,12 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingThreadGroup;
-
/**
* This class allows sockets to be closed without blocking. In some cases we have seen a call of
* socket.close block for minutes. This class maintains a thread pool for every other member we have
@@ -51,28 +49,27 @@ public class SocketCloser {
* minutes).
*/
static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS =
- Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
+ Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120);
/**
* Maximum number of threads that can be doing a socket close. Any close requests over this max
* will queue up waiting for a thread.
*/
- static final int ASYNC_CLOSE_POOL_MAX_THREADS =
- Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+ private static final int ASYNC_CLOSE_POOL_MAX_THREADS =
+ Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8);
/**
* How many milliseconds the synchronous requester waits for the async close to happen. Default is
* 0. Prior releases waited 50ms.
*/
- static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
- Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
+ private static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
+ Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0);
- /** map of thread pools of async close threads */
- private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
private final long asyncClosePoolKeepAliveSeconds;
private final int asyncClosePoolMaxThreads;
private final long asyncCloseWaitTime;
private final TimeUnit asyncCloseWaitUnits;
private boolean closed;
+ private final ExecutorService socketCloserThreadPool;
public SocketCloser() {
this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -90,53 +87,25 @@ public class SocketCloser {
this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
this.asyncCloseWaitTime = asyncCloseWaitTime;
this.asyncCloseWaitUnits = asyncCloseWaitUnits;
+
+ final ThreadGroup threadGroup =
+ LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+ ThreadFactory threadFactory = command -> {
+ Thread thread = new Thread(threadGroup, command);
+ thread.setDaemon(true);
+ return thread;
+ };
+ socketCloserThreadPool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads,
+ this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), threadFactory);
}
public int getMaxThreads() {
return this.asyncClosePoolMaxThreads;
}
- private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
- synchronized (asyncCloseExecutors) {
- ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
- if (pool == null) {
- final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
- ThreadFactory tf = new ThreadFactory() {
- public Thread newThread(final Runnable command) {
- Thread thread = new Thread(tg, command);
- thread.setDaemon(true);
- return thread;
- }
- };
- BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
- pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
- this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
- pool.allowCoreThreadTimeOut(true);
- asyncCloseExecutors.put(address, pool);
- }
- return pool;
- }
- }
-
- /**
- * Call this method if you know all the resources in the closer for the given address are no
- * longer needed. Currently a thread pool is kept for each address and if you know that an address
- * no longer needs its pool then you should call this method.
- */
- public void releaseResourcesForAddress(String address) {
- synchronized (asyncCloseExecutors) {
- ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
- if (pool != null) {
- pool.shutdown();
- asyncCloseExecutors.remove(address);
- }
- }
- }
-
private boolean isClosed() {
- synchronized (asyncCloseExecutors) {
- return this.closed;
- }
+ return this.closed;
}
/**
@@ -144,34 +113,9 @@ public class SocketCloser {
* called then the asyncClose will be done synchronously.
*/
public void close() {
- synchronized (asyncCloseExecutors) {
- if (!this.closed) {
- this.closed = true;
- for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
- pool.shutdown();
- }
- asyncCloseExecutors.clear();
- }
- }
- }
-
- private void asyncExecute(String address, Runnable r) {
- // Waiting 50ms for the async close request to complete is what the old (close per thread)
- // code did. But now that we will not create a thread for every close request
- // it seems better to let the thread that requested the close to move on quickly.
- // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
- // can be set to how many milliseconds to wait.
- if (this.asyncCloseWaitTime == 0) {
- getAsyncThreadExecutor(address).execute(r);
- } else {
- Future<?> future = getAsyncThreadExecutor(address).submit(r);
- try {
- future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- // We want this code to wait at most 50ms for the close to happen.
- // It is ok to ignore these exception and let the close continue
- // in the background.
- }
+ if (!this.closed) {
+ this.closed = true;
+ socketCloserThreadPool.shutdown();
}
}
@@ -181,34 +125,40 @@ public class SocketCloser {
* this method may block for a certain amount of time. If it is called after the SocketCloser is
* closed then a normal synchronous close is done.
*
- * @param sock the socket to close
- * @param address identifies who the socket is connected to
- * @param extra an optional Runnable with stuff to execute in the async thread
+ * @param socket the socket to close
+ * @param runnableCode an optional Runnable with stuff to execute in the async thread
*/
- public void asyncClose(final Socket sock, final String address, final Runnable extra) {
- if (sock == null || sock.isClosed()) {
+ public void asyncClose(final Socket socket, final Runnable runnableCode) {
+ if (socket == null || socket.isClosed()) {
return;
}
+
boolean doItInline = false;
try {
- synchronized (asyncCloseExecutors) {
- if (isClosed()) {
- // this SocketCloser has been closed so do a synchronous, inline, close
- doItInline = true;
- } else {
- asyncExecute(address, new Runnable() {
- public void run() {
- Thread.currentThread().setName("AsyncSocketCloser for " + address);
- try {
- if (extra != null) {
- extra.run();
- }
- inlineClose(sock);
- } finally {
- Thread.currentThread().setName("unused AsyncSocketCloser");
+ if (isClosed()) {
+ // this SocketCloser has been closed so do a synchronous, inline, close
+ doItInline = true;
+ } else {
+ socketCloserThreadPool.execute(() -> {
+ if (runnableCode != null) {
+ runnableCode.run();
+ }
+ inlineClose(socket);
+ });
+ if (this.asyncCloseWaitTime != 0) {
+ try {
+ Future future = socketCloserThreadPool.submit(() -> {
+ if (runnableCode != null) {
+ runnableCode.run();
}
- }
- });
+ inlineClose(socket);
+ });
+ future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ // We want this code to wait at most 50ms for the close to happen.
+ // It is ok to ignore these exception and let the close continue
+ // in the background.
+ }
}
}
} catch (OutOfMemoryError ignore) {
@@ -217,10 +167,10 @@ public class SocketCloser {
doItInline = true;
}
if (doItInline) {
- if (extra != null) {
- extra.run();
+ if (runnableCode != null) {
+ runnableCode.run();
}
- inlineClose(sock);
+ inlineClose(socket);
}
}
@@ -228,19 +178,19 @@ public class SocketCloser {
/**
* Closes the specified socket
*
- * @param sock the socket to close
+ * @param socket the socket to close
*/
- private static void inlineClose(final Socket sock) {
+ private void inlineClose(final Socket socket) {
// the next two statements are a mad attempt to fix bug
// 36041 - segv in jrockit in pthread signaling code. This
// seems to alleviate the problem.
try {
- sock.shutdownInput();
- sock.shutdownOutput();
+ socket.shutdownInput();
+ socket.shutdownOutput();
} catch (Exception e) {
}
try {
- sock.close();
+ socket.close();
} catch (IOException ignore) {
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
http://git-wip-us.apache.org/repos/asf/geode/blob/6d899bdb/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 0ecb3bf..954a33c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -562,7 +562,7 @@ public class Connection implements Runnable {
} catch (IOException io) {
logger.fatal(LocalizedMessage
.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
- t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), null);
+ t.getSocketCloser().asyncClose(socket, null);
throw io;
}
}
@@ -847,7 +847,7 @@ public class Connection implements Runnable {
Socket s = this.socket;
if (s != null && !s.isClosed()) {
prepareForAsyncClose();
- this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
+ this.owner.getSocketCloser().asyncClose(s, null);
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/6d899bdb/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 044ab42..11c3bb3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -929,10 +929,6 @@ public class ConnectionTable {
owner.getDM().getMembershipManager().getShutdownCause());
}
}
-
- if (remoteAddress != null) {
- this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
- }
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/6d899bdb/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index 942cad4..b6dbfe2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -14,22 +14,21 @@
*/
package org.apache.geode.internal.net;
-import static org.junit.Assert.*;
-
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.UnitTest;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Tests the default SocketCloser.
@@ -62,86 +61,49 @@ public class SocketCloserJUnitTest {
*/
@Test
public void testAsync() {
- final CountDownLatch cdl = new CountDownLatch(1);
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicInteger waitingToClose = new AtomicInteger(0);
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- waitingToClose.incrementAndGet();
- cdl.await();
- } catch (InterruptedException e) {
- }
- }
- };
final int SOCKET_COUNT = 100;
- final Socket[] aSockets = new Socket[SOCKET_COUNT];
- for (int i = 0; i < SOCKET_COUNT; i++) {
- aSockets[i] = createClosableSocket();
- }
- // Schedule a 100 sockets for async close.
- // They should all be stuck on cdl.
- for (int i = 0; i < SOCKET_COUNT; i++) {
- this.socketCloser.asyncClose(aSockets[i], "A", r);
- }
- // Make sure the sockets have not been closed
- for (int i = 0; i < SOCKET_COUNT; i++) {
- assertEquals(false, aSockets[i].isClosed());
- }
- final Socket[] bSockets = new Socket[SOCKET_COUNT];
- for (int i = 0; i < SOCKET_COUNT; i++) {
- bSockets[i] = createClosableSocket();
- }
+ final int REMOTE_CLIENT_COUNT = 200;
+
+ List<Socket> trackedSockets = new ArrayList<>();
// Schedule a 100 sockets for async close.
- // They should all be stuck on cdl.
- for (int i = 0; i < SOCKET_COUNT; i++) {
- this.socketCloser.asyncClose(bSockets[i], "B", r);
- }
- // Make sure the sockets have not been closed
- for (int i = 0; i < SOCKET_COUNT; i++) {
- assertEquals(false, bSockets[i].isClosed());
+ // They should all be stuck on countDownLatch.
+ for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
+ Socket[] aSockets = new Socket[SOCKET_COUNT];
+
+ for (int j = 0; j < SOCKET_COUNT; j++) {
+ aSockets[j] = createClosableSocket();
+ trackedSockets.add(aSockets[j]);
+ this.socketCloser.asyncClose(aSockets[j], () -> {
+ try {
+ waitingToClose.incrementAndGet();
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ }
+ });
+ }
}
+
// close the socketCloser first to verify that the sockets
// that have already been scheduled will be still be closed.
- this.socketCloser.releaseResourcesForAddress("A");
this.socketCloser.close();
- // Each thread pool (one for A and one for B) has a max of 8 threads.
- // So verify that this many are currently waiting on cdl.
- {
- final int maxThreads = this.socketCloser.getMaxThreads();
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- return waitingToClose.get() == 2 * maxThreads;
- }
-
- public String description() {
- return "expected " + 2 * maxThreads + " waiters but found only " + waitingToClose.get();
- }
- };
- Wait.waitForCriterion(wc, 5000, 10, true);
- }
- // now count down the latch that allows the sockets to close
- cdl.countDown();
+ countDownLatch.countDown();
// now all the sockets should get closed; use a wait criteria
// since a thread pool is doing to closes
- {
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- for (int i = 0; i < SOCKET_COUNT; i++) {
- if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
- return false;
- }
- }
- return true;
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+ boolean areAllClosed = true;
+ for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext();) {
+ Socket socket = iterator.next();
+ if (socket.isClosed()) {
+ iterator.remove();
+ continue;
}
-
- public String description() {
- return "one or more sockets did not close";
- }
- };
- Wait.waitForCriterion(wc, 5000, 10, true);
- }
+ areAllClosed = false;
+ }
+ return areAllClosed;
+ });
}
/**
@@ -150,18 +112,11 @@ public class SocketCloserJUnitTest {
@Test
public void testClosedSocket() throws Exception {
final AtomicBoolean runnableCalled = new AtomicBoolean();
- Runnable r = new Runnable() {
- @Override
- public void run() {
- runnableCalled.set(true);
- }
- };
Socket s = createClosableSocket();
s.close();
- this.socketCloser.asyncClose(s, "A", r);
- Wait.pause(10);
- assertEquals(false, runnableCalled.get());
+ this.socketCloser.asyncClose(s, () -> runnableCalled.set(true));
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !runnableCalled.get());
}
/**
@@ -170,25 +125,11 @@ public class SocketCloserJUnitTest {
@Test
public void testClosedSocketCloser() {
final AtomicBoolean runnableCalled = new AtomicBoolean();
- Runnable r = new Runnable() {
- @Override
- public void run() {
- runnableCalled.set(true);
- }
- };
- final Socket s = createClosableSocket();
+ final Socket closableSocket = createClosableSocket();
this.socketCloser.close();
- this.socketCloser.asyncClose(s, "A", r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- return runnableCalled.get() && s.isClosed();
- }
-
- public String description() {
- return "runnable was not called or socket was not closed";
- }
- };
- Wait.waitForCriterion(wc, 5000, 10, true);
+ this.socketCloser.asyncClose(closableSocket, () -> runnableCalled.set(true));
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> runnableCalled.get() && closableSocket.isClosed());
}
}