You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/31 20:55:50 UTC

[geode] branch feature/GEODE-3637 updated (a5e3ed0 -> 6e4e702)

This is an automated email from the ASF dual-hosted git repository.

udo pushed a change to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from a5e3ed0  GEODE-3637: Moved client queue initialization into the ServerConnection.java
     new 55fad5e  GEODE-3637: Committing tests in order not to loose progress
     new 6e4e702  GEODE-3637: Final commit with test to confirm asynchronous client queue creation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 335 ++++++++++++---------
 .../cache/tier/sockets/ServerConnection.java       |  12 -
 .../apache/geode/internal/SSLConfigJUnitTest.java  |   5 +-
 .../sockets/AcceptorImplClientQueueDUnitTest.java  | 263 ++++++++++++++++
 .../apache/geode/test/dunit/rules/CacheRule.java   |  22 +-
 5 files changed, 472 insertions(+), 165 deletions(-)
 create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].

[geode] 02/02: GEODE-3637: Final commit with test to confirm asynchronous client queue creation

Posted by ud...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 6e4e70282a34ce5299d6968994d1be34ded4955a
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Tue Oct 31 13:55:42 2017 -0700

    GEODE-3637: Final commit with test to confirm asynchronous client queue creation
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 105 ++++++-------
 .../cache/tier/sockets/ServerConnection.java       |  12 --
 .../sockets/AcceptorImplClientQueueDUnitTest.java} | 168 ++++++++++-----------
 .../apache/geode/test/dunit/rules/CacheRule.java   |  22 ++-
 4 files changed, 154 insertions(+), 153 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 721874f..193b300 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -95,6 +95,7 @@ import org.apache.geode.internal.util.ArrayUtils;
 /**
  * Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts
  * up threads to process requests from these.
+ * 
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
@@ -102,6 +103,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
+  private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4;
 
   protected final CacheServerStats stats;
   private final int maxConnections;
@@ -272,6 +274,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * The ip address or host name this acceptor is to bind to; <code>null</code> or "" indicates it
    * will listen on all local addresses.
+   * 
    * @since GemFire 5.7
    */
   private final String bindHostName;
@@ -311,13 +314,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Initializes this acceptor thread to listen for connections on the given port.
+   * 
    * @param port The port on which this acceptor listens for connections. If <code>0</code>, a
-   * random port will be chosen.
+   *        random port will be chosen.
    * @param bindHostName The ip address or host name this acceptor listens on for connections. If
-   * <code>null</code> or "" then all local addresses are used
+   *        <code>null</code> or "" then all local addresses are used
    * @param socketBufferSize The buffer size for server-side sockets
    * @param maximumTimeBetweenPings The maximum time between client pings. This value is used by the
-   * <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
+   *        <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
    * @param internalCache The GemFire cache whose contents is served to clients
    * @param maxConnections the maximum number of connections allowed in the server pool
    * @param maxThreads the maximum number of threads allowed in the server pool
@@ -326,14 +330,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * @since GemFire 5.7
    */
   public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription,
-                      int socketBufferSize, int maximumTimeBetweenPings,
-                      InternalCache internalCache,
-                      int maxConnections, int maxThreads, int maximumMessageCount,
-                      int messageTimeToLive,
-                      ConnectionListener listener, List overflowAttributesList,
-                      boolean isGatewayReceiver,
-                      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
-                      ServerConnectionFactory serverConnectionFactory) throws IOException {
+      int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
+      int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
+      ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
+      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+      ServerConnectionFactory serverConnectionFactory) throws IOException {
     this.securityService = internalCache.getSecurityService();
     this.bindHostName = calcBindHostName(internalCache, bindHostName);
     this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
@@ -445,7 +446,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         // fix for bug 36617. If BindException is thrown, retry after
         // sleeping. The server may have been stopped and then
         // immediately restarted, which sometimes results in a bind exception
-        for (; ; ) {
+        for (;;) {
           try {
             this.serverSock.bind(new InetSocketAddress(getBindAddress(), port), backLog);
             break;
@@ -474,7 +475,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         // fix for bug 36617. If BindException is thrown, retry after
         // sleeping. The server may have been stopped and then
         // immediately restarted, which sometimes results in a bind exception
-        for (; ; ) {
+        for (;;) {
           try {
             this.serverSock = this.socketCreator.createServerSocket(port, backLog, getBindAddress(),
                 this.gatewayTransportFilters, socketBufferSize);
@@ -519,7 +520,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       String sockName = getServerName();
       logger.info(LocalizedMessage.create(
           LocalizedStrings.AcceptorImpl_CACHE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1,
-          new Object[]{sockName, Integer.valueOf(backLog)}));
+          new Object[] {sockName, Integer.valueOf(backLog)}));
       if (isGatewayReceiver) {
         this.stats = GatewayReceiverStats.createGatewayReceiverStats(sockName);
       } else {
@@ -581,6 +582,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
           }
         }
       };
+      logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE);
       return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
           socketThreadFactory, rejectedExecutionHandler);
     } catch (IllegalArgumentException poolInitException) {
@@ -592,16 +594,15 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
-    final ThreadGroup clientQueueThreadGroup = LoggingThreadGroup.createThreadGroup(
-        "Client Queue Initialization ", logger);
+    final ThreadGroup clientQueueThreadGroup =
+        LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger);
 
     ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
       AtomicInteger connNum = new AtomicInteger(-1);
 
       @Override
       public Thread newThread(final Runnable command) {
-        String
-            threadName =
+        String threadName =
             clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
         Runnable runnable = new Runnable() {
           public void run() {
@@ -616,8 +617,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       }
     };
     try {
-      return new ThreadPoolExecutor(1, 16, 60,
-          TimeUnit.SECONDS, new SynchronousQueue(), clientQueueThreadFactory);
+      return new ThreadPoolExecutor(1, 16, 60, TimeUnit.SECONDS, new SynchronousQueue(),
+          clientQueueThreadFactory);
     } catch (IllegalArgumentException poolInitException) {
       throw poolInitException;
     }
@@ -683,6 +684,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
+   * 
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -691,13 +693,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
+   * 
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
   private final static int DEPRECATED_SELECTOR_POOL_SIZE =
       Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
-  private final static int HANDSHAKE_POOL_SIZE =
-      Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue();
+  private final static int HANDSHAKE_POOL_SIZE = Integer
+      .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
 
   @Override
   public void start() throws IOException {
@@ -828,6 +831,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Ensure that the CachedRegionHelper and ServerConnection classes get loaded.
+   * 
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
@@ -1189,6 +1193,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * The work loop of this acceptor
+   * 
    * @see #accept
    */
   public void run() {
@@ -1229,8 +1234,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   /**
-   * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new {@link
-   * ServerConnection}to handle messages from that client.
+   * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new
+   * {@link ServerConnection}to handle messages from that client.
    */
   @Override
   public void accept() {
@@ -1318,7 +1323,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * blocking is good because it will throttle the rate at which we create new connections.
    */
   private void handOffNewClientConnection(final Socket socket,
-                                          final ServerConnectionFactory serverConnectionFactory) {
+      final ServerConnectionFactory serverConnectionFactory) {
     try {
       this.stats.incAcceptsInProgress();
       this.hsPool.execute(new Runnable() {
@@ -1403,8 +1408,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   protected void handleNewClientConnection(final Socket socket,
-                                           final ServerConnectionFactory serverConnectionFactory)
-      throws IOException {
+      final ServerConnectionFactory serverConnectionFactory) throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
     // communication, create a ServerConnection. If this socket is being used
     // for 'server to client' communication, send it to the CacheClientNotifier
@@ -1425,24 +1429,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       throw new EOFException();
     }
 
-    // GEODE-3637
-    if (communicationMode.isSubscriptionFeed()) {
-      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-      logger.info(
-          ":Bridge server: Initializing {} server-to-client communication socket: {} using selector={}",
-          primary ? "primary" : "secondary", socket, isSelector());
-      AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
-          this.notifyBySubscription);
+    // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue
+    // initialization to be done in another threadPool
+    if (initializeClientPools(socket, communicationMode)) {
       return;
     }
-//    if (communicationMode.isSubscriptionFeed()) {
-//      boolean
-//          isPrimaryServerToClient =
-//          communicationMode == CommunicationMode.PrimaryServerToClient;
-//      clientQueueInitPool
-//          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
-//      return;
-//    }
 
     logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
         socket);
@@ -1452,7 +1443,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       if (curCnt >= this.maxConnections) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
-            new Object[]{socket.getInetAddress(), Integer.valueOf(curCnt),
+            new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
                 Integer.valueOf(this.maxConnections)}));
         if (communicationMode.expectsConnectionRefusalMessage()) {
           try {
@@ -1492,7 +1483,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         }
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL,
-            new Object[]{serverConn}));
+            new Object[] {serverConn}));
         try {
           ServerHandShakeProcessor.refuse(socket.getOutputStream(),
               LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
@@ -1506,6 +1497,17 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     }
   }
 
+  private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) {
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean isPrimaryServerToClient =
+          communicationMode == CommunicationMode.PrimaryServerToClient;
+      clientQueueInitPool
+          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
+      return true;
+    }
+    return false;
+  }
+
   private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
     socket.setSoTimeout(this.acceptTimeout);
     this.socketCreator.configureServerSSLSocket(socket);
@@ -1667,9 +1669,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * @param bindName the ip address or host name that this acceptor should bind to. If null or ""
-   * then calculate it.
+   *        then calculate it.
    * @return the ip address or host name this acceptor will listen on. An "" if all local addresses
-   * will be listened to.
+   *         will be listened to.
    * @since GemFire 5.7
    */
   private static String calcBindHostName(Cache cache, String bindName) {
@@ -1706,6 +1708,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Gets the address that this bridge server can be contacted on from external processes.
+   * 
    * @since GemFire 5.7
    */
   public String getExternalAddress() {
@@ -1739,6 +1742,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This method finds a client notifier and returns it. It is used to propagate interest
    * registrations to other servers
+   * 
    * @return the instance that provides client notification
    */
   @Override
@@ -1826,7 +1830,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     private final AcceptorImpl acceptor;
 
     public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
-                                      AcceptorImpl acceptor) {
+        AcceptorImpl acceptor) {
       this.socket = socket;
       this.acceptor = acceptor;
       this.isPrimaryServerToClient = isPrimaryServerToClient;
@@ -1837,9 +1841,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
           isPrimaryServerToClient ? "primary" : "secondary", socket);
       try {
-        acceptor.getCacheClientNotifier()
-            .registerClient(socket, isPrimaryServerToClient, acceptor.getAcceptorId(),
-                acceptor.isNotifyBySubscription());
+        acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient,
+            acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
       } catch (IOException ex) {
         closeSocket(socket);
         if (isRunning()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 1cac128..b141c6c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1134,8 +1134,6 @@ public abstract class ServerConnection implements Runnable {
     if (getAcceptor().isSelector()) {
       boolean finishedMsg = false;
       try {
-        initializeClientNofication();
-
         this.stats.decThreadQueueSize();
         if (!isTerminated()) {
           getAcceptor().setTLCommBuffer();
@@ -1190,16 +1188,6 @@ public abstract class ServerConnection implements Runnable {
     }
   }
 
-  private void initializeClientNofication() throws IOException {
-//    if (communicationMode.isSubscriptionFeed()) {
-//      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-//      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-//          primary ? "primary" : "secondary", theSocket);
-//      getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
-//          getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
-//    }
-  }
-
   /**
    * If registered with a selector then this will be the key we are registered with.
    */
diff --git a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
similarity index 69%
rename from geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
index 9da6ca9..f5a2e63 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
@@ -12,19 +12,22 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal;
+package org.apache.geode.internal.cache.tier.sockets;
 
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.rmi.RemoteException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -43,6 +46,8 @@ import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.server.ClientSubscriptionConfig;
 import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedLockBlackboard;
+import org.apache.geode.distributed.DistributedLockBlackboardImpl;
 import org.apache.geode.internal.cache.DiskStoreAttributes;
 import org.apache.geode.internal.cache.InitialImageOperation;
 import org.apache.geode.internal.cache.InternalCache;
@@ -58,17 +63,19 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolde
 import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 @Category(DistributedTest.class)
-public class AcceptorImplDUnitTest implements Serializable {
+public class AcceptorImplClientQueueDUnitTest implements Serializable {
   private final Host host = Host.getHost(0);
-  private final static int numberOfEntries = 1000;
+  private final static int numberOfEntries = 200;
   private final static AtomicInteger eventCount = new AtomicInteger(0);
+  private final static AtomicBoolean completedClient2 = new AtomicBoolean(false);
 
   @ClassRule
   public static DistributedTestRule distributedTestRule = new DistributedTestRule();
 
   @Rule
-  public CacheRule cacheRule = CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(
-      host.getVM(1)).build();
+  public CacheRule cacheRule =
+      CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1))
+          .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build();
 
   @Rule
   public SerializableTestName name = new SerializableTestName();
@@ -77,30 +84,23 @@ public class AcceptorImplDUnitTest implements Serializable {
   public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
 
   @Rule
-  public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
 
-  @Rule
-  public SharedCountersRule sharedCountersRule = SharedCountersRule.builder().build();
+  private DistributedLockBlackboard blackboard = null;
 
   @Before
-  public void setup() {
-    Host host = this.host;
-    sharedCountersRule.initialize("startNow");
-    host.getAllVMs().forEach((vm) ->
-        vm.invoke(() -> {
-          System.setProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1");
-        })
-    );
+  public void setup() throws Exception {
+    blackboard = DistributedLockBlackboardImpl.getInstance();
   }
 
   @After
-  public void tearDown() {
-
-    host.getAllVMs().forEach((vm) ->
-        vm.invoke(() -> {
-          InitialImageOperation.slowImageProcessing = 0;
-        })
-    );
+  public void tearDown() throws RemoteException {
+    blackboard.initCount();
+    host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
+      InitialImageOperation.slowImageProcessing = 0;
+      System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
+    }));
   }
 
   @Test
@@ -123,10 +123,8 @@ public class AcceptorImplDUnitTest implements Serializable {
       clientCacheFactory.setPoolSubscriptionRedundancy(1);
       clientCacheFactory.setPoolReadTimeout(200);
       clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
-      ClientCache
-          cache =
-          clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
-              .set("mcast-port", "0").create();
+      ClientCache cache = clientCacheFactory.set("durable-client-id", "1")
+          .set("durable-client-timeout", "300").set("mcast-port", "0").create();
       ClientRegionFactory<Object, Object> clientRegionFactory =
           cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
       Region region = clientRegionFactory.create("subscriptionRegion");
@@ -163,25 +161,24 @@ public class AcceptorImplDUnitTest implements Serializable {
       InitialImageOperation.slowImageProcessing = 30;
     });
 
-    vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
+    AsyncInvocation<Boolean> completedClient1 =
+        vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
 
-      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-      clientCacheFactory.setPoolSubscriptionEnabled(true);
-      clientCacheFactory.setPoolSubscriptionRedundancy(1);
-      clientCacheFactory.setPoolMinConnections(1);
-      clientCacheFactory.setPoolMaxConnections(1);
-      clientCacheFactory.setPoolReadTimeout(200);
-      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
-      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
-      sharedCountersRule.increment("startNow");
-      ClientCache
-          cache =
-          clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
-              .set("mcast-port", "0").create();
-      ClientRegionFactory<Object, Object> clientRegionFactory =
-          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
-      Region region = clientRegionFactory
-          .addCacheListener(new CacheListenerAdapter() {
+          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+          clientCacheFactory.setPoolSubscriptionEnabled(true);
+          clientCacheFactory.setPoolSubscriptionRedundancy(1);
+          clientCacheFactory.setPoolMinConnections(1);
+          clientCacheFactory.setPoolMaxConnections(1);
+          clientCacheFactory.setPoolReadTimeout(200);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+          ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1")
+              .set("durable-client-timeout", "300").set("mcast-port", "0");
+          blackboard.incCount();
+          ClientCache cache = cacheFactory.create();
+
+          ClientRegionFactory<Object, Object> clientRegionFactory =
+              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+          Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() {
             @Override
             public void afterCreate(EntryEvent event) {
               eventCount.incrementAndGet();
@@ -191,45 +188,45 @@ public class AcceptorImplDUnitTest implements Serializable {
             public void afterUpdate(EntryEvent event) {
               eventCount.incrementAndGet();
             }
-          })
-          .create("subscriptionRegion");
-
-      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
-      cache.readyForEvents();
-    });
+          }).create("subscriptionRegion");
 
-    AsyncInvocation<Boolean>
-        completed =
-        vm3.invokeAsync("Start Client2 to add entries to region", () -> {
-          while (true) {
-            Thread.sleep(100);
-            if (sharedCountersRule.getTotal("startNow") == 1) {
-              break;
-            }
-          }
-          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-          clientCacheFactory.setPoolRetryAttempts(0);
-          clientCacheFactory.setPoolMinConnections(1);
-          clientCacheFactory.setPoolMaxConnections(1);
-          clientCacheFactory.setPoolReadTimeout(200);
-          clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
-          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
-          ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
-          ClientRegionFactory<Object, Object> clientRegionFactory =
-              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
-          Region region = clientRegionFactory.create("subscriptionRegion");
-
-          for (int i = 0; i < 100; i++) {
-            System.err.println("i = " + region.get(i));
-          }
+          region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+          cache.readyForEvents();
+          Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+              .until(() -> eventCount.get() == numberOfEntries);
           cache.close();
-          return true;
+          return eventCount.get() == numberOfEntries;
         });
-    Awaitility.await().atMost(40, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
-        .until(() -> vm2.invoke(() -> {
-          return eventCount.get();
-        }) == numberOfEntries);
-    assertTrue(completed.get());
+
+    vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+      while (true) {
+        Thread.sleep(100);
+        if (blackboard.getCount() == 1) {
+          break;
+        }
+      }
+      ClientCache cache = null;
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolRetryAttempts(0);
+      clientCacheFactory.setPoolMinConnections(1);
+      clientCacheFactory.setPoolMaxConnections(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.setPoolSocketConnectTimeout(500);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+      cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      int returnValue = 0;
+      for (int i = 0; i < 100; i++) {
+        returnValue = (int) region.get(i);
+      }
+      cache.close();
+      completedClient2.set(returnValue == 99);
+    });
+    assertTrue(completedClient1.get());
+    assertTrue(vm3.invoke(() -> completedClient2.get()));
   }
 
   private int createSubscriptionServer(InternalCache cache) throws IOException {
@@ -241,21 +238,20 @@ public class AcceptorImplDUnitTest implements Serializable {
   private void initializeDiskStore(InternalCache cache) throws IOException {
     DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
     diskStoreAttributes.name = "clientQueueDS";
-    diskStoreAttributes.diskDirs = new File[]{tempDir.newFolder(name + "_dir")};
+    diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")};
     cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
   }
 
   private void initializeReplicateRegion(InternalCache cache) {
     cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
-        .setSubscriptionAttributes(new SubscriptionAttributes(
-            InterestPolicy.ALL)).create("subscriptionRegion");
+        .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL))
+        .create("subscriptionRegion");
   }
 
   private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
       throws IOException {
     CacheServer cacheServer1 = cache.addCacheServer(false);
-    ClientSubscriptionConfig clientSubscriptionConfig =
-        cacheServer1.getClientSubscriptionConfig();
+    ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig();
     clientSubscriptionConfig.setEvictionPolicy("entry");
     clientSubscriptionConfig.setCapacity(5);
     clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
index dc42da8..b65bf86 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
@@ -63,6 +63,7 @@ public class CacheRule extends DistributedExternalResource {
   private final boolean disconnectAfter;
   private final List<VM> createCacheInVMs;
   private final Properties config;
+  private final Properties systemProperties;
 
   public static Builder builder() {
     return new Builder();
@@ -74,18 +75,19 @@ public class CacheRule extends DistributedExternalResource {
     this.disconnectAfter = builder.disconnectAfter;
     this.createCacheInVMs = builder.createCacheInVMs;
     this.config = builder.config;
+    this.systemProperties = builder.systemProperties;
   }
 
   @Override
   protected void before() {
     if (createCacheInAll) {
-      invoker().invokeInEveryVMAndController(() -> createCache(config));
+      invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties));
     } else {
       if (createCache) {
-        createCache(config);
+        createCache(config, systemProperties);
       }
       for (VM vm : createCacheInVMs) {
-        vm.invoke(() -> createCache(config));
+        vm.invoke(() -> createCache(config, systemProperties));
       }
     }
   }
@@ -108,7 +110,8 @@ public class CacheRule extends DistributedExternalResource {
     return cache.getInternalDistributedSystem();
   }
 
-  private static void createCache(final Properties config) {
+  private static void createCache(final Properties config, final Properties systemProperties) {
+    System.getProperties().putAll(systemProperties);
     cache = (InternalCache) new CacheFactory(config).create();
   }
 
@@ -141,6 +144,7 @@ public class CacheRule extends DistributedExternalResource {
     private boolean disconnectAfter;
     private List<VM> createCacheInVMs = new ArrayList<>();
     private Properties config = new Properties();
+    private Properties systemProperties = new Properties();
 
     public Builder() {
       config.setProperty(LOCATORS, getLocators());
@@ -195,6 +199,16 @@ public class CacheRule extends DistributedExternalResource {
       return this;
     }
 
+    public Builder addSystemProperty(final String key, final String value) {
+      this.systemProperties.put(key, value);
+      return this;
+    }
+
+    public Builder addSystemProperties(final Properties config) {
+      this.systemProperties.putAll(config);
+      return this;
+    }
+
     public CacheRule build() {
       return new CacheRule(this);
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.

[geode] 01/02: GEODE-3637: Committing tests in order not to loose progress

Posted by ud...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 55fad5e725b73630d5ccb88ca66917c554c31334
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Fri Oct 27 14:27:30 2017 -0700

    GEODE-3637: Committing tests in order not to loose progress
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 364 ++++++++++++---------
 .../cache/tier/sockets/ServerConnection.java       |  14 +-
 .../geode/internal/AcceptorImplDUnitTest.java      | 267 +++++++++++++++
 .../apache/geode/internal/SSLConfigJUnitTest.java  |   5 +-
 4 files changed, 478 insertions(+), 172 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index ad910bd..721874f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -95,7 +95,6 @@ import org.apache.geode.internal.util.ArrayUtils;
 /**
  * Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts
  * up threads to process requests from these.
- *
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
@@ -115,6 +114,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private final ThreadPoolExecutor hsPool;
 
   /**
+   * A pool used to process client-queue-initializations.
+   */
+  private final ThreadPoolExecutor clientQueueInitPool;
+
+  /**
    * The port on which this acceptor listens for client connections
    */
   private final int localPort;
@@ -268,7 +272,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * The ip address or host name this acceptor is to bind to; <code>null</code> or "" indicates it
    * will listen on all local addresses.
-   *
    * @since GemFire 5.7
    */
   private final String bindHostName;
@@ -308,14 +311,13 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Initializes this acceptor thread to listen for connections on the given port.
-   *
    * @param port The port on which this acceptor listens for connections. If <code>0</code>, a
-   *        random port will be chosen.
+   * random port will be chosen.
    * @param bindHostName The ip address or host name this acceptor listens on for connections. If
-   *        <code>null</code> or "" then all local addresses are used
+   * <code>null</code> or "" then all local addresses are used
    * @param socketBufferSize The buffer size for server-side sockets
    * @param maximumTimeBetweenPings The maximum time between client pings. This value is used by the
-   *        <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
+   * <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
    * @param internalCache The GemFire cache whose contents is served to clients
    * @param maxConnections the maximum number of connections allowed in the server pool
    * @param maxThreads the maximum number of threads allowed in the server pool
@@ -324,11 +326,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * @since GemFire 5.7
    */
   public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription,
-      int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
-      int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
-      ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
-      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
-      ServerConnectionFactory serverConnectionFactory) throws IOException {
+                      int socketBufferSize, int maximumTimeBetweenPings,
+                      InternalCache internalCache,
+                      int maxConnections, int maxThreads, int maximumMessageCount,
+                      int messageTimeToLive,
+                      ConnectionListener listener, List overflowAttributesList,
+                      boolean isGatewayReceiver,
+                      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+                      ServerConnectionFactory serverConnectionFactory) throws IOException {
     this.securityService = internalCache.getSecurityService();
     this.bindHostName = calcBindHostName(internalCache, bindHostName);
     this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
@@ -440,7 +445,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         // fix for bug 36617. If BindException is thrown, retry after
         // sleeping. The server may have been stopped and then
         // immediately restarted, which sometimes results in a bind exception
-        for (;;) {
+        for (; ; ) {
           try {
             this.serverSock.bind(new InetSocketAddress(getBindAddress(), port), backLog);
             break;
@@ -469,7 +474,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         // fix for bug 36617. If BindException is thrown, retry after
         // sleeping. The server may have been stopped and then
         // immediately restarted, which sometimes results in a bind exception
-        for (;;) {
+        for (; ; ) {
           try {
             this.serverSock = this.socketCreator.createServerSocket(port, backLog, getBindAddress(),
                 this.gatewayTransportFilters, socketBufferSize);
@@ -514,7 +519,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       String sockName = getServerName();
       logger.info(LocalizedMessage.create(
           LocalizedStrings.AcceptorImpl_CACHE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1,
-          new Object[] {sockName, Integer.valueOf(backLog)}));
+          new Object[]{sockName, Integer.valueOf(backLog)}));
       if (isGatewayReceiver) {
         this.stats = GatewayReceiverStats.createGatewayReceiverStats(sockName);
       } else {
@@ -534,103 +539,130 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
         this.clientNotifier.getStats());
 
-    {
-      ThreadPoolExecutor tmp_pool = null;
-      String gName = "ServerConnection "
-          // + serverSock.getInetAddress()
-          + "on port " + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
-
-        public Thread newThread(final Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
+    pool = initializeServerConnectionThreadPool();
+    hsPool = initializeHandshakerThreadPool();
+    clientQueueInitPool = initializeClientQueueInitializerThreadPool();
+
+    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+    String postAuthzFactoryName =
+        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+    isPostAuthzCallbackPresent =
+        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+  }
+
+  private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException {
+    String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(Runnable command) {
+        String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incAcceptThreadsCreated();
+        return new Thread(socketThreadGroup, command, threadName);
+      }
+    };
+    try {
+      final BlockingQueue blockingQueue = new SynchronousQueue();
+      final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+          try {
+            blockingQueue.put(r);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt(); // preserve the state
+            throw new RejectedExecutionException(
+                LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
           }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incConnectionThreadsCreated();
-          Runnable r = new Runnable() {
-            public void run() {
-              try {
-                command.run();
-              } catch (CancelException e) { // bug 39463
-                // ignore
-              } finally {
-                ConnectionTable.releaseThreadsSockets();
-              }
-            }
-          };
-          return new Thread(socketThreadGroup, r, tName);
         }
       };
-      try {
-        if (isSelector()) {
-          tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
-              getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
-        } else {
-          tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
-              TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
-        }
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        throw poolInitException;
-      }
-      this.pool = tmp_pool;
+      return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
+          socketThreadFactory, rejectedExecutionHandler);
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      this.pool.shutdownNow();
+      throw poolInitException;
     }
-    {
-      ThreadPoolExecutor tmp_hsPool = null;
-      String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+  }
 
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
+  private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
+    final ThreadGroup clientQueueThreadGroup = LoggingThreadGroup.createThreadGroup(
+        "Client Queue Initialization ", logger);
 
-        public Thread newThread(Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
-          }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incAcceptThreadsCreated();
-          return new Thread(socketThreadGroup, command, tName);
-        }
-      };
-      try {
-        final BlockingQueue bq = new SynchronousQueue();
-        final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
-          public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+    ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String
+            threadName =
+            clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        Runnable runnable = new Runnable() {
+          public void run() {
             try {
-              bq.put(r);
-            } catch (InterruptedException ex) {
-              Thread.currentThread().interrupt(); // preserve the state
-              throw new RejectedExecutionException(
-                  LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
+              command.run();
+            } catch (CancelException e) {
+              logger.debug("Client Queue Initialization was canceled.", e);
             }
           }
         };
-        tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq,
-            socketThreadFactory, reh);
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        this.pool.shutdownNow();
-        throw poolInitException;
+        return new Thread(clientQueueThreadGroup, runnable, threadName);
       }
-      this.hsPool = tmp_hsPool;
+    };
+    try {
+      return new ThreadPoolExecutor(1, 16, 60,
+          TimeUnit.SECONDS, new SynchronousQueue(), clientQueueThreadFactory);
+    } catch (IllegalArgumentException poolInitException) {
+      throw poolInitException;
     }
+  }
 
-    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
-
-    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
-
-    String postAuthzFactoryName =
-        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-
-    isPostAuthzCallbackPresent =
-        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+  private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException {
+    String gName = "ServerConnection "
+        // + serverSock.getInetAddress()
+        + "on port " + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incConnectionThreadsCreated();
+        Runnable r = new Runnable() {
+          public void run() {
+            try {
+              command.run();
+            } catch (CancelException e) { // bug 39463
+              // ignore
+            } finally {
+              ConnectionTable.releaseThreadsSockets();
+            }
+          }
+        };
+        return new Thread(socketThreadGroup, r, tName);
+      }
+    };
+    try {
+      if (isSelector()) {
+        return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
+            getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
+      } else {
+        return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
+            TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+      }
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      throw poolInitException;
+    }
   }
 
   public long getAcceptorId() {
@@ -651,7 +683,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
-   *
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -660,7 +691,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
-   *
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -798,12 +828,12 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Ensure that the CachedRegionHelper and ServerConnection classes get loaded.
-   *
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
-    if (emergencyClassesLoaded)
+    if (emergencyClassesLoaded) {
       return;
+    }
     emergencyClassesLoaded = true;
     CachedRegionHelper.loadEmergencyClasses();
     ServerConnection.loadEmergencyClasses();
@@ -870,8 +900,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private Selector tmpSel;
 
   private void checkForStuckKeys() {
-    if (!WORKAROUND_SELECTOR_BUG)
+    if (!WORKAROUND_SELECTOR_BUG) {
       return;
+    }
     if (tmpSel == null) {
       try {
         tmpSel = Selector.open();
@@ -887,8 +918,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     while (it.hasNext()) {
       SelectionKey sk = (SelectionKey) it.next();
       ServerConnection sc = (ServerConnection) sk.attachment();
-      if (sc == null)
+      if (sc == null) {
         continue;
+      }
       try {
         sk.cancel();
         this.selector.selectNow(); // clear the cancelled key
@@ -1040,40 +1072,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
           break;
         }
         if (events == 0) {
-          // zeroEventsCount++;
-          // if (zeroEventsCount > 0) {
-          // zeroEventsCount = 0;
           checkForStuckKeys();
-
-          // try {
-          // this.selector.close(); // this selector is sick!
-          // } catch (IOException ignore) {
-          // }
-          // this.selector = Selector.open();
-          // {
-          // Iterator it = selectorRegistrations.iterator();
-          // while (it.hasNext()) {
-          // ServerConnection sc = (ServerConnection)it.next();
-          // sc.registerWithSelector2(this.selector);
-          // }
-          // }
-          // }
-          // ArrayList al = new ArrayList();
-          // Iterator keysIt = this.selector.keys().iterator();
-          // while (keysIt.hasNext()) {
-          // SelectionKey sk = (SelectionKey)keysIt.next();
-          // al.add(sk.attachment());
-          // sk.cancel();
-          // }
-          // events = this.selector.selectNow();
-          // Iterator alIt = al.iterator();
-          // while (alIt.hasNext()) {
-          // ServerConnection sc = (ServerConnection)alIt.next();
-          // sc.registerWithSelector2(this.selector);
-          // }
-          // events = this.selector.select();
-          // } else {
-          // zeroEventsCount = 0;
         }
         while (events > 0) {
           int cancelCount = 0;
@@ -1130,16 +1129,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
                   logger.warn(
                       LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected));
                 }
-                // } else if (key.isValid() && key.isConnectable()) {
-                // logger.info("DEBUG isConnectable and isValid key=" + key);
-                // finishCon(sc);
               } else {
                 finishCon(sc);
                 if (key.isValid()) {
                   logger.warn(LocalizedMessage.create(
                       LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key));
-                  // } else {
-                  // logger.info("DEBUG !isValid key=" + key);
                 }
               }
             } catch (CancelledKeyException ex) { // fix for bug 37739
@@ -1195,7 +1189,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * The work loop of this acceptor
-   *
    * @see #accept
    */
   public void run() {
@@ -1236,8 +1229,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   /**
-   * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new
-   * {@link ServerConnection}to handle messages from that client.
+   * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new {@link
+   * ServerConnection}to handle messages from that client.
    */
   @Override
   public void accept() {
@@ -1325,7 +1318,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * blocking is good because it will throttle the rate at which we create new connections.
    */
   private void handOffNewClientConnection(final Socket socket,
-      final ServerConnectionFactory serverConnectionFactory) {
+                                          final ServerConnectionFactory serverConnectionFactory) {
     try {
       this.stats.incAcceptsInProgress();
       this.hsPool.execute(new Runnable() {
@@ -1410,7 +1403,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   protected void handleNewClientConnection(final Socket socket,
-      final ServerConnectionFactory serverConnectionFactory) throws IOException {
+                                           final ServerConnectionFactory serverConnectionFactory)
+      throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
     // communication, create a ServerConnection. If this socket is being used
     // for 'server to client' communication, send it to the CacheClientNotifier
@@ -1432,14 +1426,23 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     }
 
     // GEODE-3637
-    // if (communicationMode.isSubscriptionFeed()) {
-    // boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-    // logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-    // primary ? "primary" : "secondary", socket);
-    // AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
-    // this.notifyBySubscription);
-    // return;
-    // }
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+      logger.info(
+          ":Bridge server: Initializing {} server-to-client communication socket: {} using selector={}",
+          primary ? "primary" : "secondary", socket, isSelector());
+      AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
+          this.notifyBySubscription);
+      return;
+    }
+//    if (communicationMode.isSubscriptionFeed()) {
+//      boolean
+//          isPrimaryServerToClient =
+//          communicationMode == CommunicationMode.PrimaryServerToClient;
+//      clientQueueInitPool
+//          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
+//      return;
+//    }
 
     logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
         socket);
@@ -1449,7 +1452,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       if (curCnt >= this.maxConnections) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
-            new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
+            new Object[]{socket.getInetAddress(), Integer.valueOf(curCnt),
                 Integer.valueOf(this.maxConnections)}));
         if (communicationMode.expectsConnectionRefusalMessage()) {
           try {
@@ -1489,7 +1492,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         }
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL,
-            new Object[] {serverConn}));
+            new Object[]{serverConn}));
         try {
           ServerHandShakeProcessor.refuse(socket.getOutputStream(),
               LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
@@ -1664,10 +1667,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * @param bindName the ip address or host name that this acceptor should bind to. If null or ""
-   *        then calculate it.
+   * then calculate it.
    * @return the ip address or host name this acceptor will listen on. An "" if all local addresses
-   *         will be listened to.
-   *
+   * will be listened to.
    * @since GemFire 5.7
    */
   private static String calcBindHostName(Cache cache, String bindName) {
@@ -1704,7 +1706,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Gets the address that this bridge server can be contacted on from external processes.
-   *
    * @since GemFire 5.7
    */
   public String getExternalAddress() {
@@ -1738,7 +1739,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This method finds a client notifier and returns it. It is used to propagate interest
    * registrations to other servers
-   *
    * @return the instance that provides client notification
    */
   @Override
@@ -1796,7 +1796,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * This method returns a thread safe structure which can be iterated over without worrying about
    * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get
    * client info.
-   *
    */
   public ServerConnection[] getAllServerConnectionList() {
     return this.allSCList;
@@ -1820,4 +1819,43 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
     releaseCommBuffer(Message.setTLCommBuffer(null));
   }
+
+  private class ClientQueueInitializerTask implements Runnable {
+    private final Socket socket;
+    private final boolean isPrimaryServerToClient;
+    private final AcceptorImpl acceptor;
+
+    public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
+                                      AcceptorImpl acceptor) {
+      this.socket = socket;
+      this.acceptor = acceptor;
+      this.isPrimaryServerToClient = isPrimaryServerToClient;
+    }
+
+    @Override
+    public void run() {
+      logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
+          isPrimaryServerToClient ? "primary" : "secondary", socket);
+      try {
+        acceptor.getCacheClientNotifier()
+            .registerClient(socket, isPrimaryServerToClient, acceptor.getAcceptorId(),
+                acceptor.isNotifyBySubscription());
+      } catch (IOException ex) {
+        closeSocket(socket);
+        if (isRunning()) {
+          if (!acceptor.loggedAcceptError) {
+            acceptor.loggedAcceptError = true;
+            if (ex instanceof SocketTimeoutException) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT));
+            } else {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0,
+                  ex), ex);
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 5c69769..1cac128 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1191,13 +1191,13 @@ public abstract class ServerConnection implements Runnable {
   }
 
   private void initializeClientNofication() throws IOException {
-    if (communicationMode.isSubscriptionFeed()) {
-      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-          primary ? "primary" : "secondary", theSocket);
-      getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
-          getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
-    }
+//    if (communicationMode.isSubscriptionFeed()) {
+//      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+//      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+//          primary ? "primary" : "secondary", theSocket);
+//      getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
+//          getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
+//    }
   }
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
new file mode 100644
index 0000000..9da6ca9
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.server.ClientSubscriptionConfig;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.rules.SharedCountersRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(DistributedTest.class)
+public class AcceptorImplDUnitTest implements Serializable {
+  private final Host host = Host.getHost(0);
+  private final static int numberOfEntries = 1000;
+  private final static AtomicInteger eventCount = new AtomicInteger(0);
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(
+      host.getVM(1)).build();
+
+  @Rule
+  public SerializableTestName name = new SerializableTestName();
+
+  @Rule
+  public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
+
+  @Rule
+  public SharedCountersRule sharedCountersRule = SharedCountersRule.builder().build();
+
+  @Before
+  public void setup() {
+    Host host = this.host;
+    sharedCountersRule.initialize("startNow");
+    host.getAllVMs().forEach((vm) ->
+        vm.invoke(() -> {
+          System.setProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1");
+        })
+    );
+  }
+
+  @After
+  public void tearDown() {
+
+    host.getAllVMs().forEach((vm) ->
+        vm.invoke(() -> {
+          InitialImageOperation.slowImageProcessing = 0;
+        })
+    );
+  }
+
+  @Test
+  public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception {
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    int vm0_port = vm0.invoke("Start server with subscription turned on", () -> {
+      try {
+        return createSubscriptionServer(cacheRule.getCache());
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm2.invoke("Start Client1 with durable interest registration turned on", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolSubscriptionEnabled(true);
+      clientCacheFactory.setPoolSubscriptionRedundancy(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache
+          cache =
+          clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
+              .set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      cache.readyForEvents();
+      cache.close(true);
+    });
+    vm3.invoke("Start Client2 to add entries to region", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      for (int i = 0; i < numberOfEntries; i++) {
+        region.put(i, i);
+      }
+      cache.close();
+    });
+
+    int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> {
+      try {
+        int serverPort = createSubscriptionServer(cacheRule.getCache());
+        InitialImageOperation.slowImageProcessing = 30;
+        return serverPort;
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm0.invoke("Turn on slow image processsing", () -> {
+      InitialImageOperation.slowImageProcessing = 30;
+    });
+
+    vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
+
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolSubscriptionEnabled(true);
+      clientCacheFactory.setPoolSubscriptionRedundancy(1);
+      clientCacheFactory.setPoolMinConnections(1);
+      clientCacheFactory.setPoolMaxConnections(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+      sharedCountersRule.increment("startNow");
+      ClientCache
+          cache =
+          clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
+              .set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory
+          .addCacheListener(new CacheListenerAdapter() {
+            @Override
+            public void afterCreate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+
+            @Override
+            public void afterUpdate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+          })
+          .create("subscriptionRegion");
+
+      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      cache.readyForEvents();
+    });
+
+    AsyncInvocation<Boolean>
+        completed =
+        vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+          while (true) {
+            Thread.sleep(100);
+            if (sharedCountersRule.getTotal("startNow") == 1) {
+              break;
+            }
+          }
+          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+          clientCacheFactory.setPoolRetryAttempts(0);
+          clientCacheFactory.setPoolMinConnections(1);
+          clientCacheFactory.setPoolMaxConnections(1);
+          clientCacheFactory.setPoolReadTimeout(200);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+          ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+          ClientRegionFactory<Object, Object> clientRegionFactory =
+              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+          Region region = clientRegionFactory.create("subscriptionRegion");
+
+          for (int i = 0; i < 100; i++) {
+            System.err.println("i = " + region.get(i));
+          }
+          cache.close();
+          return true;
+        });
+    Awaitility.await().atMost(40, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+        .until(() -> vm2.invoke(() -> {
+          return eventCount.get();
+        }) == numberOfEntries);
+    assertTrue(completed.get());
+  }
+
+  private int createSubscriptionServer(InternalCache cache) throws IOException {
+    initializeDiskStore(cache);
+    initializeReplicateRegion(cache);
+    return initializeCacheServerWithSubscription(host, cache);
+  }
+
+  private void initializeDiskStore(InternalCache cache) throws IOException {
+    DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
+    diskStoreAttributes.name = "clientQueueDS";
+    diskStoreAttributes.diskDirs = new File[]{tempDir.newFolder(name + "_dir")};
+    cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
+  }
+
+  private void initializeReplicateRegion(InternalCache cache) {
+    cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
+        .setSubscriptionAttributes(new SubscriptionAttributes(
+            InterestPolicy.ALL)).create("subscriptionRegion");
+  }
+
+  private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
+      throws IOException {
+    CacheServer cacheServer1 = cache.addCacheServer(false);
+    ClientSubscriptionConfig clientSubscriptionConfig =
+        cacheServer1.getClientSubscriptionConfig();
+    clientSubscriptionConfig.setEvictionPolicy("entry");
+    clientSubscriptionConfig.setCapacity(5);
+    clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
+    cacheServer1.setPort(0);
+    cacheServer1.setHostnameForClients(host.getHostName());
+    cacheServer1.start();
+    return cacheServer1.getPort();
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
index 357736b..8904380 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
@@ -21,6 +21,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -44,7 +45,8 @@ public class SSLConfigJUnitTest {
   private static final Properties GATEWAY_SSL_PROPS_MAP = new Properties();
   private static final Properties GATEWAY_PROPS_SUBSET_MAP = new Properties();
 
-  static {
+  @BeforeClass
+  public static void initializeSSLMaps() {
 
     SSL_PROPS_MAP.put("javax.net.ssl.keyStoreType", "jks");
     SSL_PROPS_MAP.put("javax.net.ssl.keyStore", "/export/gemfire-configs/gemfire.keystore");
@@ -383,7 +385,6 @@ public class SSLConfigJUnitTest {
     gemFireProps.put(CLUSTER_SSL_CIPHERS, sslciphers);
     gemFireProps.put(CLUSTER_SSL_REQUIRE_AUTHENTICATION, String.valueOf(requireAuth));
 
-
     gemFireProps.put(SERVER_SSL_ENABLED, String.valueOf(cacheServerSslenabled));
     gemFireProps.put(SERVER_SSL_PROTOCOLS, cacheServerSslprotocols);
     gemFireProps.put(SERVER_SSL_CIPHERS, cacheServerSslciphers);

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.