You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by vf...@apache.org on 2015/11/25 20:07:21 UTC

[08/50] [abbrv] incubator-geode git commit: GEODE-332: use thread pools for p2p readers and async close

GEODE-332: use thread pools for p2p readers and async close

The old code always created a brand new thread when it
wanted to async close a socket or create a new p2p reader
or handshake with a p2p reader. Now it will reuse threads
which improves latency and having a hard cap on the maximum
number of closer threads prevents a large number of close
threads causing an OutOfMemory exception.

Introduced a new SocketCloser class for async close.
The ConnectionTable has a SocketCloser instance
for closing peer-to-peer sockets and the
CacheClientNotifier has one for closing sockets
used by a cache server to send queue data to clients.

The ConnectionTable closer will have at most 8 threads
per address its sockets are connected to. If these
threads are not used for 120 seconds they will timeout.
This timeout can be configured using the
p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS system property.
The maximum threads per address can be changed from 8
using the p2p.ASYNC_CLOSE_POOL_MAX_THREADS system property.
By default when an async socket close request is made
the requestor does not wait for request to be done.
In previous releases the requestor waited 50 milliseconds.
Now a wait can be configured using the
p2p.ASYNC_CLOSE_WAIT_MILLISECONDS system property.

The CacheClientNotifier closer will have at most 1 thread
per address its sockets are connected to. If these
threads are not used for 120 seconds they will timeout.
This timeout can be configured using the
p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS system property.
Also this closer forces all requestors to wait 50 milliseconds
for the close to be done.

ConnectionTable also uses a thread pool for when ever
it needs a thread for a p2p reader or when a p2p sender
needs a thread to do the initial handshake. This pool
has an unlimited number of threads but if a thread is
not used for 120 seconds it will timeout. This timeout
can be configured using the p2p.READER_POOL_KEEP_ALIVE_TIME
system property.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7fcb2fd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7fcb2fd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7fcb2fd9

Branch: refs/heads/feature/GEODE-78
Commit: 7fcb2fd938338c9fde2a152364c094c196c5a045
Parents: eb89661
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Sep 15 16:03:40 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Sep 23 10:13:54 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/SocketCloser.java | 241 +++++++++++++++++++
 .../gemfire/internal/SocketCreator.java         |  98 --------
 .../cache/tier/sockets/CacheClientNotifier.java |  10 +
 .../cache/tier/sockets/CacheClientProxy.java    |  32 ++-
 .../gemfire/internal/tcp/Connection.java        | 130 ++++++----
 .../gemfire/internal/tcp/ConnectionTable.java   |  95 +++++++-
 .../gemfire/internal/SocketCloserJUnitTest.java | 180 ++++++++++++++
 .../internal/SocketCloserWithWaitJUnitTest.java |  22 ++
 8 files changed, 639 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
new file mode 100644
index 0000000..8468daa
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
@@ -0,0 +1,241 @@
+package com.gemstone.gemfire.internal;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+
+/**
+ * This class allows sockets to be closed without blocking.
+ * In some cases we have seen a call of socket.close block for minutes.
+ * This class maintains a thread pool for every other member we have
+ * connected sockets to. Any request to close by default returns immediately
+ * to the caller while the close is called by a background thread.
+ * The requester can wait for a configured amount of time by setting
+ * the "p2p.ASYNC_CLOSE_WAIT_MILLISECONDS" system property.
+ * Idle threads that are not doing a close will timeout after 2 minutes.
+ * This can be configured by setting the
+ * "p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS" system property.
+ * A pool exists for each remote address that we have a socket connected to.
+ * That way if close is taking a long time to one address we can still get closes
+ * done to another address.
+ * Each address pool by default has at most 8 threads. This max threads can be
+ * configured using the "p2p.ASYNC_CLOSE_POOL_MAX_THREADS" system property.
+ */
+public class SocketCloser {
+  private static final Logger logger = LogService.getLogger();
+  /** Number of seconds to wait before timing out an unused async close thread. Default is 120 (2 minutes). */
+  static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
+  /** Maximum number of threads that can be doing a socket close. Any close requests over this max will queue up waiting for a thread. */
+  static final int ASYNC_CLOSE_POOL_MAX_THREADS = Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+  /** How many milliseconds the synchronous requester waits for the async close to happen. Default is 0. Prior releases waited 50ms. */ 
+  static final long ASYNC_CLOSE_WAIT_MILLISECONDS = Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
+  
+
+  /** map of thread pools of async close threads */
+  private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
+  private final long asyncClosePoolKeepAliveSeconds;
+  private final int asyncClosePoolMaxThreads;
+  private final long asyncCloseWaitTime;
+  private final TimeUnit asyncCloseWaitUnits;
+  private boolean closed;
+  
+  public SocketCloser() {
+    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS, ASYNC_CLOSE_WAIT_MILLISECONDS, TimeUnit.MILLISECONDS);
+  }
+  public SocketCloser(int asyncClosePoolMaxThreads, long asyncCloseWaitMillis) {
+    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, asyncClosePoolMaxThreads, asyncCloseWaitMillis, TimeUnit.MILLISECONDS);
+  }
+  public SocketCloser(long asyncClosePoolKeepAliveSeconds, int asyncClosePoolMaxThreads, long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
+    this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
+    this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
+    this.asyncCloseWaitTime = asyncCloseWaitTime;
+    this.asyncCloseWaitUnits = asyncCloseWaitUnits;
+  }
+  
+  public int getMaxThreads() {
+    return this.asyncClosePoolMaxThreads;
+  }
+
+  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
+    synchronized (asyncCloseExecutors) {
+      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
+      if (pool == null) {
+        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+        ThreadFactory tf = new ThreadFactory() { 
+          public Thread newThread(final Runnable command) { 
+            Thread thread = new Thread(tg, command); 
+            thread.setDaemon(true);
+            return thread;
+          } 
+        }; 
+        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); 
+        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
+        pool.allowCoreThreadTimeOut(true);
+        asyncCloseExecutors.put(address, pool);
+      }
+      return pool;
+    }
+  }
+  /**
+   * Call this method if you know all the resources in the closer
+   * for the given address are no longer needed.
+   * Currently a thread pool is kept for each address and if you
+   * know that an address no longer needs its pool then you should
+   * call this method.
+   */
+  public void releaseResourcesForAddress(String address) {
+    synchronized (asyncCloseExecutors) {
+      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
+      if (pool != null) {
+        pool.shutdown();
+        asyncCloseExecutors.remove(address);
+      }
+    }
+  }
+  private boolean isClosed() {
+    synchronized (asyncCloseExecutors) {
+      return this.closed;
+    }
+  }
+  /**
+   * Call close when you are all done with your socket closer.
+   * If you call asyncClose after close is called then the
+   * asyncClose will be done synchronously.
+   */
+  public void close() {
+    synchronized (asyncCloseExecutors) {
+      if (!this.closed) {
+        this.closed = true;
+        for (ThreadPoolExecutor pool: asyncCloseExecutors.values()) {
+          pool.shutdown();
+        }
+        asyncCloseExecutors.clear();
+      }
+    }
+  }
+  private void asyncExecute(String address, Runnable r) {
+    // Waiting 50ms for the async close request to complete is what the old (close per thread)
+    // code did. But now that we will not create a thread for every close request
+    // it seems better to let the thread that requested the close to move on quickly.
+    // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
+    // can be set to how many milliseconds to wait.
+    if (this.asyncCloseWaitTime == 0) {
+      getAsyncThreadExecutor(address).execute(r);
+    } else {
+      Future<?> future = getAsyncThreadExecutor(address).submit(r);
+      try {
+        future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        // We want this code to wait at most 50ms for the close to happen.
+        // It is ok to ignore these exception and let the close continue
+        // in the background.
+      }
+    }
+  }
+  /**
+   * Closes the specified socket in a background thread.
+   * In some cases we see close hang (see bug 33665).
+   * Depending on how the SocketCloser is configured (see ASYNC_CLOSE_WAIT_MILLISECONDS)
+   * this method may block for a certain amount of time.
+   * If it is called after the SocketCloser is closed then a normal
+   * synchronous close is done.
+   * @param sock the socket to close
+   * @param address identifies who the socket is connected to
+   * @param extra an optional Runnable with stuff to execute in the async thread
+   */
+  public void asyncClose(final Socket sock, final String address, final Runnable extra) {
+    if (sock == null || sock.isClosed()) {
+      return;
+    }
+    boolean doItInline = false;
+    try {
+      synchronized (asyncCloseExecutors) {
+        if (isClosed()) {
+          // this SocketCloser has been closed so do a synchronous, inline, close
+          doItInline = true;
+        } else {
+          asyncExecute(address, new Runnable() {
+            public void run() {
+              Thread.currentThread().setName("AsyncSocketCloser for " + address);
+              try {
+                if (extra != null) {
+                  extra.run();
+                }
+                inlineClose(sock);
+              } finally {
+                Thread.currentThread().setName("unused AsyncSocketCloser");
+              }
+            }
+          });
+        }
+      }
+    } catch (OutOfMemoryError ignore) {
+      // If we can't start a thread to close the socket just do it inline.
+      // See bug 50573.
+      doItInline = true;
+    }
+    if (doItInline) {
+      if (extra != null) {
+        extra.run();
+      }
+      inlineClose(sock);
+    }
+  }
+  
+
+  /**
+   * Closes the specified socket
+   * @param sock the socket to close
+   */
+  private static void inlineClose(final Socket sock) {
+    // the next two statements are a mad attempt to fix bug
+    // 36041 - segv in jrockit in pthread signaling code.  This
+    // seems to alleviate the problem.
+    try {
+      sock.shutdownInput();
+      sock.shutdownOutput();
+    } catch (Exception e) {
+    }
+    try {
+      sock.close();
+    } catch (IOException ignore) {
+    } catch (VirtualMachineError err) {
+      SystemFailure.initiateFailure(err);
+      // If this ever returns, rethrow the error.  We're poisoned
+      // now, so don't let this thread continue.
+      throw err;
+    } catch (java.security.ProviderException pe) {
+      // some ssl implementations have trouble with termination and throw
+      // this exception.  See bug #40783
+    } catch (Error e) {
+      // Whenever you catch Error or Throwable, you must also
+      // catch VirtualMachineError (see above).  However, there is
+      // _still_ a possibility that you are dealing with a cascading
+      // error condition, so you also need to check to see if the JVM
+      // is still usable:
+      SystemFailure.checkFailure();
+      // Sun's NIO implementation has been known to throw Errors
+      // that are caused by IOExceptions.  If this is the case, it's
+      // okay.
+      if (e.getCause() instanceof IOException) {
+        // okay...
+      } else {
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index ff4a22c..940936f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -75,7 +75,6 @@ import com.gemstone.gemfire.internal.cache.wan.TransportFilterServerSocket;
 import com.gemstone.gemfire.internal.cache.wan.TransportFilterSocketFactory;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.util.PasswordUtil;
 import com.gemstone.org.jgroups.util.ConnectionWatcher;
@@ -1197,103 +1196,6 @@ public class SocketCreator  implements com.gemstone.org.jgroups.util.SockCreator
     return (String[]) v.toArray( new String[ v.size() ] );
   }
   
-  /**
-   * Closes the specified socket in a background thread and waits a limited
-   * amount of time for the close to complete. In some cases we see close
-   * hang (see bug 33665).
-   * Made public so it can be used from CacheClientProxy.
-   * @param sock the socket to close
-   * @param who who the socket is connected to
-   * @param extra an optional Runnable with stuff to execute in the async thread
-   */
-  public static void asyncClose(final Socket sock, String who, final Runnable extra) {
-    if (sock == null || sock.isClosed()) {
-      return;
-    }
-    try {
-    ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
-
-    Thread t = new Thread(tg, new Runnable() {
-        public void run() {
-          if (extra != null) {
-            extra.run();
-          }
-          inlineClose(sock);
-        }
-      }, "AsyncSocketCloser for " + who);
-    t.setDaemon(true);
-    try {
-      t.start();
-    } catch (OutOfMemoryError ignore) {
-      // If we can't start a thread to close the socket just do it inline.
-      // See bug 50573.
-      inlineClose(sock);
-      return;
-    }
-    try {
-      // [bruce] if the network fails, this will wait the full amount of time
-      // on every close, so it must be kept very short.  it was 750ms before,
-      // causing frequent hangs in net-down hydra tests
-      t.join(50/*ms*/);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-    }
-    catch (VirtualMachineError e) {
-      SystemFailure.initiateFailure(e);
-      // NOTREACHED
-      throw e;
-    }
-  }
-  
-
-  /**
-   * Closes the specified socket
-   * @param sock the socket to close
-   */
-  public static void inlineClose(final Socket sock) {
-    
-    // the next two statements are a mad attempt to fix bug
-    // 36041 - segv in jrockit in pthread signaling code.  This
-    // seems to alleviate the problem.
-    try {
-      sock.shutdownInput();
-      sock.shutdownOutput();
-    }
-    catch (Exception e) {
-    }
-    try {
-      sock.close();
-    } catch (IOException ignore) {
-    } 
-    catch (VirtualMachineError err) {
-      SystemFailure.initiateFailure(err);
-      // If this ever returns, rethrow the error.  We're poisoned
-      // now, so don't let this thread continue.
-      throw err;
-    }
-    catch (java.security.ProviderException pe) {
-      // some ssl implementations have trouble with termination and throw
-      // this exception.  See bug #40783
-    }
-    catch (Error e) {
-      // Whenever you catch Error or Throwable, you must also
-      // catch VirtualMachineError (see above).  However, there is
-      // _still_ a possibility that you are dealing with a cascading
-      // error condition, so you also need to check to see if the JVM
-      // is still usable:
-      SystemFailure.checkFailure();
-      // Sun's NIO implementation has been known to throw Errors
-      // that are caused by IOExceptions.  If this is the case, it's
-      // okay.
-      if (e.getCause() instanceof IOException) {
-        // okay...
-
-      } else {
-        throw e;
-      }
-    }
-  }
   
   protected void initializeClientSocketFactory() {
     this.clientSocketFactory = null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 2cede25..deddfd1 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -73,6 +73,7 @@ import com.gemstone.gemfire.internal.ClassLoadUtil;
 import com.gemstone.gemfire.internal.DummyStatisticsFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.InternalInstantiator;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.Version;
@@ -1668,6 +1669,8 @@ public class CacheClientNotifier {
 
       // Close the statistics
       this._statistics.close();
+      
+      this.socketCloser.close();
     } 
   }
 
@@ -2120,6 +2123,7 @@ public class CacheClientNotifier {
     // Set the Cache
     this.setCache((GemFireCacheImpl)cache);
     this.acceptorStats = acceptorStats;
+    this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms for close
 
     // Set the LogWriter
     this.logWriter = (InternalLogWriter)cache.getLogger();
@@ -2385,6 +2389,10 @@ public class CacheClientNotifier {
     return this.acceptorStats;
   }
   
+  public SocketCloser getSocketCloser() {
+    return this.socketCloser;
+  }
+  
   public void addCompiledQuery(DefaultQuery query){
     if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null){
       // Added successfully.
@@ -2651,6 +2659,8 @@ public class CacheClientNotifier {
 
   private SystemTimer.SystemTimerTask clientPingTask;
   
+  private final SocketCloser socketCloser;
+  
   private static final long CLIENT_PING_TASK_PERIOD =
     Long.getLong("gemfire.serverToClientPingPeriod", 60000);
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index 15f83bb..85c7493 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -63,7 +63,6 @@ import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 import com.gemstone.gemfire.internal.Version;
@@ -116,6 +115,8 @@ public class CacheClientProxy implements ClientSession {
    * The socket between the server and the client
    */
   protected Socket _socket;
+  
+  private final AtomicBoolean _socketClosed = new AtomicBoolean();
 
   /**
    * A communication buffer used by each message we send to the client
@@ -960,10 +961,7 @@ public class CacheClientProxy implements ClientSession {
       // to fix bug 37684
       // 1. check to see if dispatcher is still alive
       if (this._messageDispatcher.isAlive()) {
-        if (this._socket != null && !this._socket.isClosed()) {
-          SocketCreator.asyncClose(this._socket, this._remoteHostAddress, null);
-          getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
-        }
+        closeSocket();
         destroyRQ();
         alreadyDestroyed = true;
         this._messageDispatcher.interrupt();
@@ -996,19 +994,27 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-  private void closeTransientFields() {
-    // Close the socket
-    if (this._socket != null && !this._socket.isClosed()) {
-      try {
-        this._socket.close();
-        getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
-      } catch (IOException e) {/*ignore*/}
+  private void closeSocket() {
+    if (this._socketClosed.compareAndSet(false, true)) {
+      // Close the socket
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress, null);
+      getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
+  }
+  
+  private void closeTransientFields() {
+    closeSocket();
 
     // Null out comm buffer, host address, ports and proxy id. All will be
     // replaced when the client reconnects.
     releaseCommBuffer();
-    this._remoteHostAddress = null;
+    {
+      String remoteHostAddress = this._remoteHostAddress;
+      if (remoteHostAddress != null) {
+        this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
+        this._remoteHostAddress = null;
+      }
+    }
     try {
       this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
     } catch (CacheClosedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index cd1b7dc..88bca22 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -62,6 +62,7 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.DSFIDFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
@@ -115,6 +116,11 @@ public class Connection implements Runnable {
 
   /** the table holding this connection */
   final ConnectionTable owner;
+  
+  /** Set to false once run() is terminating. Using this instead of Thread.isAlive  
+    * as the reader thread may be a pooled thread.
+    */ 
+  private volatile boolean isRunning = false; 
 
   /** true if connection is a shared resource that can be used by more than one thread */
   private boolean sharedResource;
@@ -136,11 +142,14 @@ public class Connection implements Runnable {
   }
 
   private final static ThreadLocal isReaderThread = new ThreadLocal();
-  // return true if this thread is a reader thread
   public final static void makeReaderThread() {
     // mark this thread as a reader thread
-    isReaderThread.set(Boolean.TRUE);
+    makeReaderThread(true);
   }
+  private final static void makeReaderThread(boolean v) {
+    isReaderThread.set(v);
+  }
+  // return true if this thread is a reader thread
   public final static boolean isReaderThread() {
     Object o = isReaderThread.get();
     if (o == null) {
@@ -319,7 +328,7 @@ public class Connection implements Runnable {
   private final Object handshakeSync = new Object();
 
   /** message reader thread */
-  Thread readerThread;
+  private volatile Thread readerThread;
 
 //  /**
 //   * When a thread owns the outLock and is writing to the socket, it must
@@ -523,7 +532,7 @@ public class Connection implements Runnable {
     Connection c = new Connection(t, s);
     boolean readerStarted = false;
     try {
-      c.startReader();
+      c.startReader(t);
       readerStarted = true;
     } finally {
       if (!readerStarted) {
@@ -571,7 +580,7 @@ public class Connection implements Runnable {
       }
       catch (IOException io) {
         logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
-        SocketCreator.asyncClose(s, this.remoteAddr.toString(), null);
+        t.getSocketCloser().asyncClose(s, this.remoteAddr.toString(), null);
         throw io;
       }
     }
@@ -809,6 +818,8 @@ public class Connection implements Runnable {
     }
   }
   
+  private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
+  
   /**
    * asynchronously close this connection
    * 
@@ -819,28 +830,31 @@ public class Connection implements Runnable {
     
     // we do the close in a background thread because the operation may hang if 
     // there is a problem with the network.  See bug #46659
-    Runnable r = new Runnable() {
-      public void run() {
-        boolean rShuttingDown = readerShuttingDown;
-        synchronized(stateLock) {
-          if (readerThread != null && readerThread.isAlive() &&
-              !rShuttingDown && connectionState == STATE_READING
-              || connectionState == STATE_READING_ACK) {
-            readerThread.interrupt();
-          }
-        }
-      }
-    };
+
     // if simulating sickness, sockets must be closed in-line so that tests know
     // that the vm is sick when the beSick operation completes
     if (beingSick) {
-      r.run();
+      prepareForAsyncClose();
     }
     else {
-      SocketCreator.asyncClose(this.socket, String.valueOf(this.remoteAddr), r);
+      if (this.asyncCloseCalled.compareAndSet(false, true)) {
+        Socket s = this.socket;
+        if (s != null && !s.isClosed()) {
+          prepareForAsyncClose();
+          this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
+        }
+      }
     }
   }
   
+  private void prepareForAsyncClose() {
+    synchronized(stateLock) {
+      if (readerThread != null && isRunning && !readerShuttingDown
+          && (connectionState == STATE_READING || connectionState == STATE_READING_ACK)) {
+        readerThread.interrupt();
+      }
+    }
+  }
 
   private static final int CONNECT_HANDSHAKE_SIZE = 4096;
 
@@ -951,7 +965,7 @@ public class Connection implements Runnable {
    *
    * @throws IOException if handshake fails
    */
-  private void attemptHandshake() throws IOException {
+  private void attemptHandshake(ConnectionTable connTable) throws IOException {
     // send HANDSHAKE
     // send this server's port.  It's expected on the other side
     if (useNIO()) {
@@ -961,7 +975,7 @@ public class Connection implements Runnable {
       handshakeStream();
     }
 
-    startReader(); // this reader only reads the handshake and then exits
+    startReader(connTable); // this reader only reads the handshake and then exits
     waitForHandshake(); // waiting for reply
   }
 
@@ -1099,7 +1113,7 @@ public class Connection implements Runnable {
         if (conn != null) {
           // handshake
           try {
-            conn.attemptHandshake();
+            conn.attemptHandshake(t);
             if (conn.isSocketClosed()) {
               // something went wrong while reading the handshake
               // and the socket was closed or this guy sent us a
@@ -1601,26 +1615,35 @@ public class Connection implements Runnable {
         this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
       isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
     }
-    if (!beingSick && this.readerThread != null && !isIBM && this.readerThread.isAlive()
-        && this.readerThread != Thread.currentThread()) {
-      try {
-        this.readerThread.join(500);
-        if (this.readerThread.isAlive() && !this.readerShuttingDown
-            && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure
-          this.readerThread.join(1500);
-          if (this.readerThread.isAlive()) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this));
+    {
+      // Now that readerThread is returned to a pool after we close
+      // we need to be more careful not to join on a thread that belongs
+      // to someone else.
+      Thread readerThreadSnapshot = this.readerThread;
+      if (!beingSick && readerThreadSnapshot != null && !isIBM
+          && this.isRunning && !this.readerShuttingDown
+          && readerThreadSnapshot != Thread.currentThread()) {
+        try {
+          readerThreadSnapshot.join(500);
+          readerThreadSnapshot = this.readerThread;
+          if (this.isRunning && !this.readerShuttingDown
+              && readerThreadSnapshot != null
+              && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure
+            readerThreadSnapshot.join(1500);
+            if (this.isRunning) {
+              logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this));
+            }
           }
         }
+        catch (IllegalThreadStateException ignore) {
+          // ignored - thread already stopped
+        }
+        catch (InterruptedException ignore) {
+          Thread.currentThread().interrupt();
+          // but keep going, we're trying to close.
+        }
       }
-      catch (IllegalThreadStateException ignore) {
-        // ignored - thread already stopped
-      }
-      catch (InterruptedException ignore) {
-        Thread.currentThread().interrupt();
-        // but keep going, we're trying to close.
-      }
-    } // !onlyCleanup
+    }
 
     closeBatchBuffer();
     closeAllMsgDestreamers();
@@ -1677,26 +1700,22 @@ public class Connection implements Runnable {
   }
 
   /** starts a reader thread */
-  private void startReader() {
-    ThreadGroup group =
-      LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
-    Assert.assertTrue(this.readerThread == null);
-    this.readerThread =
-      new Thread(group, this, p2pReaderName());
-    this.readerThread.setDaemon(true);
-    stopped = false;
-    this.readerThread.start();
-  }
+  private void startReader(ConnectionTable connTable) { 
+    Assert.assertTrue(!this.isRunning); 
+    stopped = false; 
+    this.isRunning = true; 
+    connTable.executeCommand(this);  
+  } 
 
 
   /** in order to read non-NIO socket-based messages we need to have a thread
       actively trying to grab bytes out of the sockets input queue.
       This is that thread. */
   public void run() {
+    this.readerThread = Thread.currentThread();
+    this.readerThread.setName(p2pReaderName());
     ConnectionTable.threadWantsSharedResources();
-    if (this.isReceiver) {
-      makeReaderThread();
-    }
+    makeReaderThread(this.isReceiver);
     try {
       if (useNIO()) {
         runNioReader();
@@ -1725,6 +1744,11 @@ public class Connection implements Runnable {
       // for the handshake.
       // see bug 37524 for an example of listeners hung in waitForHandshake
       notifyHandshakeWaiter(false);
+      this.readerThread.setName("unused p2p reader");
+      synchronized (this.stateLock) {
+        this.isRunning = false;
+        this.readerThread = null;
+      }
     } // finally
   }
 
@@ -3307,7 +3331,7 @@ public class Connection implements Runnable {
   protected Object stateLock = new Object();
   
   /** for timeout processing, this is the current state of the connection */
-  protected byte connectionState;
+  protected byte connectionState = STATE_IDLE;
   
   /*~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~*/
   /** the connection is idle, but may be in use */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 9beb947..508eba2 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -19,8 +19,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.logging.log4j.Logger;
@@ -33,10 +40,12 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
 import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 
@@ -129,7 +138,15 @@ public class ConnectionTable  {
    */
   private volatile boolean closed = false;
 
-
+  /**
+   * Executor used by p2p reader and p2p handshaker threads.
+   */
+  private final Executor p2pReaderThreadPool;
+  /** Number of seconds to wait before timing out an unused p2p reader thread. Default is 120 (2 minutes). */
+  private final static long READER_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue();
+  
+  private final SocketCloser socketCloser;
+  
   /**
    * The most recent instance to be created
    * 
@@ -202,11 +219,41 @@ public class ConnectionTable  {
     this.threadOrderedConnMap = new ThreadLocal();
     this.threadConnMaps = new ArrayList();
     this.threadConnectionMap = new ConcurrentHashMap();
+    this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
+    this.socketCloser = new SocketCloser();
   /*  NOMUX: if (TCPConduit.useNIO) {
       inputMuxManager = new InputMuxManager(this);
       inputMuxManager.start(c.logger);
     }*/
   }
+  
+  private Executor createThreadPoolForIO(boolean conserveSockets) { 
+    Executor executor = null; 
+    final ThreadGroup connectionRWGroup = LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
+    if (conserveSockets) { 
+      executor = new Executor() { 
+        @Override 
+        public void execute(Runnable command) { 
+          Thread th = new Thread(connectionRWGroup, command); 
+          th.setDaemon(true); 
+          th.start(); 
+        } 
+      }; 
+    } 
+    else { 
+      BlockingQueue synchronousQueue = new SynchronousQueue(); 
+      ThreadFactory tf = new ThreadFactory() { 
+        public Thread newThread(final Runnable command) { 
+          Thread thread = new Thread(connectionRWGroup, command); 
+          thread.setDaemon(true); 
+          return thread; 
+        } 
+      }; 
+      executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME, 
+          TimeUnit.SECONDS, synchronousQueue, tf); 
+    } 
+    return executor; 
+  } 
 
   /** conduit sends connected() after establishing the server socket */
 //   protected void connected() {
@@ -715,6 +762,14 @@ public class ConnectionTable  {
         this.threadConnMaps.clear();
       }
     }
+    {
+      Executor localExec = this.p2pReaderThreadPool;
+      if (localExec != null) {
+        if (localExec instanceof ExecutorService) {
+          ((ExecutorService)localExec).shutdown();
+        }
+      }
+    }
     closeReceivers(false);
     
     Map m = (Map)this.threadOrderedConnMap.get();
@@ -724,8 +779,16 @@ public class ConnectionTable  {
         m.clear();
       }        
     }
+    this.socketCloser.close();
   }
 
+  public void executeCommand(Runnable runnable) { 
+    Executor local = this.p2pReaderThreadPool;
+    if (local != null) {
+      local.execute(runnable);
+    }
+  }
+  
   /**
    * Close all receiving threads.  This is used during shutdown and is also
    * used by a test hook that makes us deaf to incoming messages.
@@ -800,11 +863,20 @@ public class ConnectionTable  {
     }
 
     if (needsRemoval) {
+      InternalDistributedMember remoteAddress = null;
       synchronized (this.orderedConnectionMap) {
-           closeCon(reason, this.orderedConnectionMap.remove(stub));
+        Object c = this.orderedConnectionMap.remove(stub);
+        if (c instanceof Connection) {
+          remoteAddress = ((Connection) c).getRemoteAddress();
+        }
+        closeCon(reason, c);
       }
       synchronized (this.unorderedConnectionMap) {
-         closeCon(reason, this.unorderedConnectionMap.remove(stub));
+        Object c = this.unorderedConnectionMap.remove(stub);
+        if (remoteAddress == null && (c instanceof Connection)) {
+          remoteAddress = ((Connection) c).getRemoteAddress();
+        }
+        closeCon(reason, c);
       }
 
       {
@@ -813,8 +885,13 @@ public class ConnectionTable  {
           ArrayList al = (ArrayList)cm.remove(stub);
           if (al != null) {
             synchronized (al) {
-              for (Iterator it=al.iterator(); it.hasNext();)
-                closeCon(reason, it.next());
+              for (Iterator it=al.iterator(); it.hasNext();) {
+                Object c = it.next();
+                if (remoteAddress == null && (c instanceof Connection)) {
+                  remoteAddress = ((Connection) c).getRemoteAddress();
+                }
+                closeCon(reason, c);
+              }
               al.clear();
             }
           }
@@ -867,9 +944,17 @@ public class ConnectionTable  {
       if (notifyDisconnect) {
         owner.getMemberForStub(stub, false);
       }
+      
+      if (remoteAddress != null) {
+        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
+      }
     }
   }
   
+  SocketCloser getSocketCloser() {
+    return this.socketCloser;
+  }
+  
   /** check to see if there are still any receiver threads for the given end-point */
   protected boolean hasReceiversFor(Stub endPoint) {
     synchronized (this.receivers) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
new file mode 100644
index 0000000..0b66ec5
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserJUnitTest.java
@@ -0,0 +1,180 @@
+package com.gemstone.gemfire.internal;
+
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+import dunit.DistributedTestCase;
+import dunit.DistributedTestCase.WaitCriterion;
+
+/**
+ * Tests the default SocketCloser.
+ */
+@Category(UnitTest.class)
+public class SocketCloserJUnitTest {
+
+  private SocketCloser socketCloser;
+  
+  @Before
+  public void setUp() throws Exception {
+    this.socketCloser = createSocketCloser();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    this.socketCloser.close();
+  }
+  
+  private Socket createClosableSocket() {
+    return new Socket();
+  }
+
+  protected SocketCloser createSocketCloser() {
+    return new SocketCloser();
+  }
+  
+  /**
+   * Test that close requests are async.
+   */
+  @Test
+  public void testAsync() {
+    final CountDownLatch cdl = new CountDownLatch(1);
+    final AtomicInteger waitingToClose = new AtomicInteger(0);
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          waitingToClose.incrementAndGet();
+          cdl.await();
+        } catch (InterruptedException e) {
+        }
+      }
+    };
+    
+    final int SOCKET_COUNT = 100;
+    final Socket[] aSockets = new Socket[SOCKET_COUNT];
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      aSockets[i] = createClosableSocket();
+    }
+    // Schedule a 100 sockets for async close.
+    // They should all be stuck on cdl.
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      this.socketCloser.asyncClose(aSockets[i], "A", r);
+    }
+    // Make sure the sockets have not been closed
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      assertEquals(false, aSockets[i].isClosed());
+    }
+    final Socket[] bSockets = new Socket[SOCKET_COUNT];
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      bSockets[i] = createClosableSocket();
+    }
+    // Schedule a 100 sockets for async close.
+    // They should all be stuck on cdl.
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      this.socketCloser.asyncClose(bSockets[i], "B", r);
+    }
+    // Make sure the sockets have not been closed
+    for (int i=0; i < SOCKET_COUNT; i++) {
+      assertEquals(false, bSockets[i].isClosed());
+    }
+    // close the socketCloser first to verify that the sockets
+    // that have already been scheduled will be still be closed.
+    this.socketCloser.releaseResourcesForAddress("A");
+    this.socketCloser.close();
+    // Each thread pool (one for A and one for B) has a max of 8 threads.
+    // So verify that this many are currently waiting on cdl.
+    {
+      final int maxThreads = this.socketCloser.getMaxThreads();
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          return waitingToClose.get() == 2*maxThreads;
+        }
+        public String description() {
+          return "expected " + 2*maxThreads + " waiters but found only " + waitingToClose.get();
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+    }
+    // now count down the latch that allows the sockets to close
+    cdl.countDown();
+    // now all the sockets should get closed; use a wait criteria
+    // since a thread pool is doing to closes
+    {
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          for (int i=0; i < SOCKET_COUNT; i++) {
+            if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
+              return false;
+            }
+          }
+          return true;
+        }
+        public String description() {
+          return "one or more sockets did not close";
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+    }
+  }
+  
+  /**
+   * Verify that requesting an asyncClose on an already
+   * closed socket is a noop.
+   */
+  @Test
+  public void testClosedSocket() throws IOException {
+    final AtomicBoolean runnableCalled = new AtomicBoolean();
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        runnableCalled.set(true);
+      }
+    };
+    
+    Socket s = createClosableSocket();
+    s.close();
+    this.socketCloser.asyncClose(s, "A", r);
+    DistributedTestCase.pause(10);
+    assertEquals(false, runnableCalled.get());
+  }
+  
+  /**
+   * Verify that a closed SocketCloser will still close an open socket
+   */
+  @Test
+  public void testClosedSocketCloser() {
+    final AtomicBoolean runnableCalled = new AtomicBoolean();
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        runnableCalled.set(true);
+      }
+    };
+    
+    final Socket s = createClosableSocket();
+    this.socketCloser.close();
+    this.socketCloser.asyncClose(s, "A", r);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        return runnableCalled.get() && s.isClosed(); 
+      }
+      public String description() {
+        return "runnable was not called or socket was not closed";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fcb2fd9/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
new file mode 100644
index 0000000..9e63743
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/SocketCloserWithWaitJUnitTest.java
@@ -0,0 +1,22 @@
+package com.gemstone.gemfire.internal;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+/**
+ * Tests SocketCloser with a wait time. The default SocketCloser does not wait.
+ * This test configures a closer much like the one used by CacheClientNotifier.
+ */
+@Category(UnitTest.class)
+public class SocketCloserWithWaitJUnitTest extends SocketCloserJUnitTest {
+  @Override
+  protected SocketCloser createSocketCloser() {
+    return new SocketCloser(
+        SocketCloser.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS,
+        1, // max threads
+        1, TimeUnit.NANOSECONDS);
+  }
+}