You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/31 20:55:52 UTC

[geode] 02/02: GEODE-3637: Final commit with test to confirm asynchronous client queue creation

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 6e4e70282a34ce5299d6968994d1be34ded4955a
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Tue Oct 31 13:55:42 2017 -0700

    GEODE-3637: Final commit with test to confirm asynchronous client queue creation
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 105 ++++++-------
 .../cache/tier/sockets/ServerConnection.java       |  12 --
 .../sockets/AcceptorImplClientQueueDUnitTest.java} | 168 ++++++++++-----------
 .../apache/geode/test/dunit/rules/CacheRule.java   |  22 ++-
 4 files changed, 154 insertions(+), 153 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 721874f..193b300 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -95,6 +95,7 @@ import org.apache.geode.internal.util.ArrayUtils;
 /**
  * Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts
  * up threads to process requests from these.
+ * 
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
@@ -102,6 +103,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
+  private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4;
 
   protected final CacheServerStats stats;
   private final int maxConnections;
@@ -272,6 +274,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * The ip address or host name this acceptor is to bind to; <code>null</code> or "" indicates it
    * will listen on all local addresses.
+   * 
    * @since GemFire 5.7
    */
   private final String bindHostName;
@@ -311,13 +314,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Initializes this acceptor thread to listen for connections on the given port.
+   * 
    * @param port The port on which this acceptor listens for connections. If <code>0</code>, a
-   * random port will be chosen.
+   *        random port will be chosen.
    * @param bindHostName The ip address or host name this acceptor listens on for connections. If
-   * <code>null</code> or "" then all local addresses are used
+   *        <code>null</code> or "" then all local addresses are used
    * @param socketBufferSize The buffer size for server-side sockets
    * @param maximumTimeBetweenPings The maximum time between client pings. This value is used by the
-   * <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
+   *        <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
    * @param internalCache The GemFire cache whose contents is served to clients
    * @param maxConnections the maximum number of connections allowed in the server pool
    * @param maxThreads the maximum number of threads allowed in the server pool
@@ -326,14 +330,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * @since GemFire 5.7
    */
   public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription,
-                      int socketBufferSize, int maximumTimeBetweenPings,
-                      InternalCache internalCache,
-                      int maxConnections, int maxThreads, int maximumMessageCount,
-                      int messageTimeToLive,
-                      ConnectionListener listener, List overflowAttributesList,
-                      boolean isGatewayReceiver,
-                      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
-                      ServerConnectionFactory serverConnectionFactory) throws IOException {
+      int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
+      int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
+      ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
+      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+      ServerConnectionFactory serverConnectionFactory) throws IOException {
     this.securityService = internalCache.getSecurityService();
     this.bindHostName = calcBindHostName(internalCache, bindHostName);
     this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
@@ -445,7 +446,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         // fix for bug 36617. If BindException is thrown, retry after
         // sleeping. The server may have been stopped and then
         // immediately restarted, which sometimes results in a bind exception
-        for (; ; ) {
+        for (;;) {
           try {
             this.serverSock.bind(new InetSocketAddress(getBindAddress(), port), backLog);
             break;
@@ -474,7 +475,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         // fix for bug 36617. If BindException is thrown, retry after
         // sleeping. The server may have been stopped and then
         // immediately restarted, which sometimes results in a bind exception
-        for (; ; ) {
+        for (;;) {
           try {
             this.serverSock = this.socketCreator.createServerSocket(port, backLog, getBindAddress(),
                 this.gatewayTransportFilters, socketBufferSize);
@@ -519,7 +520,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       String sockName = getServerName();
       logger.info(LocalizedMessage.create(
           LocalizedStrings.AcceptorImpl_CACHE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1,
-          new Object[]{sockName, Integer.valueOf(backLog)}));
+          new Object[] {sockName, Integer.valueOf(backLog)}));
       if (isGatewayReceiver) {
         this.stats = GatewayReceiverStats.createGatewayReceiverStats(sockName);
       } else {
@@ -581,6 +582,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
           }
         }
       };
+      logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE);
       return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
           socketThreadFactory, rejectedExecutionHandler);
     } catch (IllegalArgumentException poolInitException) {
@@ -592,16 +594,15 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
-    final ThreadGroup clientQueueThreadGroup = LoggingThreadGroup.createThreadGroup(
-        "Client Queue Initialization ", logger);
+    final ThreadGroup clientQueueThreadGroup =
+        LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger);
 
     ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
       AtomicInteger connNum = new AtomicInteger(-1);
 
       @Override
       public Thread newThread(final Runnable command) {
-        String
-            threadName =
+        String threadName =
             clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
         Runnable runnable = new Runnable() {
           public void run() {
@@ -616,8 +617,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       }
     };
     try {
-      return new ThreadPoolExecutor(1, 16, 60,
-          TimeUnit.SECONDS, new SynchronousQueue(), clientQueueThreadFactory);
+      return new ThreadPoolExecutor(1, 16, 60, TimeUnit.SECONDS, new SynchronousQueue(),
+          clientQueueThreadFactory);
     } catch (IllegalArgumentException poolInitException) {
       throw poolInitException;
     }
@@ -683,6 +684,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
+   * 
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -691,13 +693,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
+   * 
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
   private final static int DEPRECATED_SELECTOR_POOL_SIZE =
       Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
-  private final static int HANDSHAKE_POOL_SIZE =
-      Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue();
+  private final static int HANDSHAKE_POOL_SIZE = Integer
+      .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
 
   @Override
   public void start() throws IOException {
@@ -828,6 +831,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Ensure that the CachedRegionHelper and ServerConnection classes get loaded.
+   * 
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
@@ -1189,6 +1193,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * The work loop of this acceptor
+   * 
    * @see #accept
    */
   public void run() {
@@ -1229,8 +1234,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   /**
-   * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new {@link
-   * ServerConnection}to handle messages from that client.
+   * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new
+   * {@link ServerConnection}to handle messages from that client.
    */
   @Override
   public void accept() {
@@ -1318,7 +1323,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * blocking is good because it will throttle the rate at which we create new connections.
    */
   private void handOffNewClientConnection(final Socket socket,
-                                          final ServerConnectionFactory serverConnectionFactory) {
+      final ServerConnectionFactory serverConnectionFactory) {
     try {
       this.stats.incAcceptsInProgress();
       this.hsPool.execute(new Runnable() {
@@ -1403,8 +1408,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   protected void handleNewClientConnection(final Socket socket,
-                                           final ServerConnectionFactory serverConnectionFactory)
-      throws IOException {
+      final ServerConnectionFactory serverConnectionFactory) throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
     // communication, create a ServerConnection. If this socket is being used
     // for 'server to client' communication, send it to the CacheClientNotifier
@@ -1425,24 +1429,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       throw new EOFException();
     }
 
-    // GEODE-3637
-    if (communicationMode.isSubscriptionFeed()) {
-      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-      logger.info(
-          ":Bridge server: Initializing {} server-to-client communication socket: {} using selector={}",
-          primary ? "primary" : "secondary", socket, isSelector());
-      AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
-          this.notifyBySubscription);
+    // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue
+    // initialization to be done in another threadPool
+    if (initializeClientPools(socket, communicationMode)) {
       return;
     }
-//    if (communicationMode.isSubscriptionFeed()) {
-//      boolean
-//          isPrimaryServerToClient =
-//          communicationMode == CommunicationMode.PrimaryServerToClient;
-//      clientQueueInitPool
-//          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
-//      return;
-//    }
 
     logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
         socket);
@@ -1452,7 +1443,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       if (curCnt >= this.maxConnections) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
-            new Object[]{socket.getInetAddress(), Integer.valueOf(curCnt),
+            new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
                 Integer.valueOf(this.maxConnections)}));
         if (communicationMode.expectsConnectionRefusalMessage()) {
           try {
@@ -1492,7 +1483,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         }
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL,
-            new Object[]{serverConn}));
+            new Object[] {serverConn}));
         try {
           ServerHandShakeProcessor.refuse(socket.getOutputStream(),
               LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
@@ -1506,6 +1497,17 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     }
   }
 
+  private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) {
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean isPrimaryServerToClient =
+          communicationMode == CommunicationMode.PrimaryServerToClient;
+      clientQueueInitPool
+          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
+      return true;
+    }
+    return false;
+  }
+
   private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
     socket.setSoTimeout(this.acceptTimeout);
     this.socketCreator.configureServerSSLSocket(socket);
@@ -1667,9 +1669,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * @param bindName the ip address or host name that this acceptor should bind to. If null or ""
-   * then calculate it.
+   *        then calculate it.
    * @return the ip address or host name this acceptor will listen on. An "" if all local addresses
-   * will be listened to.
+   *         will be listened to.
    * @since GemFire 5.7
    */
   private static String calcBindHostName(Cache cache, String bindName) {
@@ -1706,6 +1708,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Gets the address that this bridge server can be contacted on from external processes.
+   * 
    * @since GemFire 5.7
    */
   public String getExternalAddress() {
@@ -1739,6 +1742,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This method finds a client notifier and returns it. It is used to propagate interest
    * registrations to other servers
+   * 
    * @return the instance that provides client notification
    */
   @Override
@@ -1826,7 +1830,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     private final AcceptorImpl acceptor;
 
     public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
-                                      AcceptorImpl acceptor) {
+        AcceptorImpl acceptor) {
       this.socket = socket;
       this.acceptor = acceptor;
       this.isPrimaryServerToClient = isPrimaryServerToClient;
@@ -1837,9 +1841,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
           isPrimaryServerToClient ? "primary" : "secondary", socket);
       try {
-        acceptor.getCacheClientNotifier()
-            .registerClient(socket, isPrimaryServerToClient, acceptor.getAcceptorId(),
-                acceptor.isNotifyBySubscription());
+        acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient,
+            acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
       } catch (IOException ex) {
         closeSocket(socket);
         if (isRunning()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 1cac128..b141c6c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1134,8 +1134,6 @@ public abstract class ServerConnection implements Runnable {
     if (getAcceptor().isSelector()) {
       boolean finishedMsg = false;
       try {
-        initializeClientNofication();
-
         this.stats.decThreadQueueSize();
         if (!isTerminated()) {
           getAcceptor().setTLCommBuffer();
@@ -1190,16 +1188,6 @@ public abstract class ServerConnection implements Runnable {
     }
   }
 
-  private void initializeClientNofication() throws IOException {
-//    if (communicationMode.isSubscriptionFeed()) {
-//      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-//      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-//          primary ? "primary" : "secondary", theSocket);
-//      getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
-//          getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
-//    }
-  }
-
   /**
    * If registered with a selector then this will be the key we are registered with.
    */
diff --git a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
similarity index 69%
rename from geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
index 9da6ca9..f5a2e63 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
@@ -12,19 +12,22 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal;
+package org.apache.geode.internal.cache.tier.sockets;
 
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.rmi.RemoteException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -43,6 +46,8 @@ import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.server.ClientSubscriptionConfig;
 import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedLockBlackboard;
+import org.apache.geode.distributed.DistributedLockBlackboardImpl;
 import org.apache.geode.internal.cache.DiskStoreAttributes;
 import org.apache.geode.internal.cache.InitialImageOperation;
 import org.apache.geode.internal.cache.InternalCache;
@@ -58,17 +63,19 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolde
 import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 @Category(DistributedTest.class)
-public class AcceptorImplDUnitTest implements Serializable {
+public class AcceptorImplClientQueueDUnitTest implements Serializable {
   private final Host host = Host.getHost(0);
-  private final static int numberOfEntries = 1000;
+  private final static int numberOfEntries = 200;
   private final static AtomicInteger eventCount = new AtomicInteger(0);
+  private final static AtomicBoolean completedClient2 = new AtomicBoolean(false);
 
   @ClassRule
   public static DistributedTestRule distributedTestRule = new DistributedTestRule();
 
   @Rule
-  public CacheRule cacheRule = CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(
-      host.getVM(1)).build();
+  public CacheRule cacheRule =
+      CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1))
+          .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build();
 
   @Rule
   public SerializableTestName name = new SerializableTestName();
@@ -77,30 +84,23 @@ public class AcceptorImplDUnitTest implements Serializable {
   public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
 
   @Rule
-  public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
 
-  @Rule
-  public SharedCountersRule sharedCountersRule = SharedCountersRule.builder().build();
+  private DistributedLockBlackboard blackboard = null;
 
   @Before
-  public void setup() {
-    Host host = this.host;
-    sharedCountersRule.initialize("startNow");
-    host.getAllVMs().forEach((vm) ->
-        vm.invoke(() -> {
-          System.setProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1");
-        })
-    );
+  public void setup() throws Exception {
+    blackboard = DistributedLockBlackboardImpl.getInstance();
   }
 
   @After
-  public void tearDown() {
-
-    host.getAllVMs().forEach((vm) ->
-        vm.invoke(() -> {
-          InitialImageOperation.slowImageProcessing = 0;
-        })
-    );
+  public void tearDown() throws RemoteException {
+    blackboard.initCount();
+    host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
+      InitialImageOperation.slowImageProcessing = 0;
+      System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
+    }));
   }
 
   @Test
@@ -123,10 +123,8 @@ public class AcceptorImplDUnitTest implements Serializable {
       clientCacheFactory.setPoolSubscriptionRedundancy(1);
       clientCacheFactory.setPoolReadTimeout(200);
       clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
-      ClientCache
-          cache =
-          clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
-              .set("mcast-port", "0").create();
+      ClientCache cache = clientCacheFactory.set("durable-client-id", "1")
+          .set("durable-client-timeout", "300").set("mcast-port", "0").create();
       ClientRegionFactory<Object, Object> clientRegionFactory =
           cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
       Region region = clientRegionFactory.create("subscriptionRegion");
@@ -163,25 +161,24 @@ public class AcceptorImplDUnitTest implements Serializable {
       InitialImageOperation.slowImageProcessing = 30;
     });
 
-    vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
+    AsyncInvocation<Boolean> completedClient1 =
+        vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
 
-      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-      clientCacheFactory.setPoolSubscriptionEnabled(true);
-      clientCacheFactory.setPoolSubscriptionRedundancy(1);
-      clientCacheFactory.setPoolMinConnections(1);
-      clientCacheFactory.setPoolMaxConnections(1);
-      clientCacheFactory.setPoolReadTimeout(200);
-      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
-      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
-      sharedCountersRule.increment("startNow");
-      ClientCache
-          cache =
-          clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
-              .set("mcast-port", "0").create();
-      ClientRegionFactory<Object, Object> clientRegionFactory =
-          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
-      Region region = clientRegionFactory
-          .addCacheListener(new CacheListenerAdapter() {
+          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+          clientCacheFactory.setPoolSubscriptionEnabled(true);
+          clientCacheFactory.setPoolSubscriptionRedundancy(1);
+          clientCacheFactory.setPoolMinConnections(1);
+          clientCacheFactory.setPoolMaxConnections(1);
+          clientCacheFactory.setPoolReadTimeout(200);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+          ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1")
+              .set("durable-client-timeout", "300").set("mcast-port", "0");
+          blackboard.incCount();
+          ClientCache cache = cacheFactory.create();
+
+          ClientRegionFactory<Object, Object> clientRegionFactory =
+              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+          Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() {
             @Override
             public void afterCreate(EntryEvent event) {
               eventCount.incrementAndGet();
@@ -191,45 +188,45 @@ public class AcceptorImplDUnitTest implements Serializable {
             public void afterUpdate(EntryEvent event) {
               eventCount.incrementAndGet();
             }
-          })
-          .create("subscriptionRegion");
-
-      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
-      cache.readyForEvents();
-    });
+          }).create("subscriptionRegion");
 
-    AsyncInvocation<Boolean>
-        completed =
-        vm3.invokeAsync("Start Client2 to add entries to region", () -> {
-          while (true) {
-            Thread.sleep(100);
-            if (sharedCountersRule.getTotal("startNow") == 1) {
-              break;
-            }
-          }
-          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-          clientCacheFactory.setPoolRetryAttempts(0);
-          clientCacheFactory.setPoolMinConnections(1);
-          clientCacheFactory.setPoolMaxConnections(1);
-          clientCacheFactory.setPoolReadTimeout(200);
-          clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
-          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
-          ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
-          ClientRegionFactory<Object, Object> clientRegionFactory =
-              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
-          Region region = clientRegionFactory.create("subscriptionRegion");
-
-          for (int i = 0; i < 100; i++) {
-            System.err.println("i = " + region.get(i));
-          }
+          region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+          cache.readyForEvents();
+          Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+              .until(() -> eventCount.get() == numberOfEntries);
           cache.close();
-          return true;
+          return eventCount.get() == numberOfEntries;
         });
-    Awaitility.await().atMost(40, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
-        .until(() -> vm2.invoke(() -> {
-          return eventCount.get();
-        }) == numberOfEntries);
-    assertTrue(completed.get());
+
+    vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+      while (true) {
+        Thread.sleep(100);
+        if (blackboard.getCount() == 1) {
+          break;
+        }
+      }
+      ClientCache cache = null;
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolRetryAttempts(0);
+      clientCacheFactory.setPoolMinConnections(1);
+      clientCacheFactory.setPoolMaxConnections(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.setPoolSocketConnectTimeout(500);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+      cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      int returnValue = 0;
+      for (int i = 0; i < 100; i++) {
+        returnValue = (int) region.get(i);
+      }
+      cache.close();
+      completedClient2.set(returnValue == 99);
+    });
+    assertTrue(completedClient1.get());
+    assertTrue(vm3.invoke(() -> completedClient2.get()));
   }
 
   private int createSubscriptionServer(InternalCache cache) throws IOException {
@@ -241,21 +238,20 @@ public class AcceptorImplDUnitTest implements Serializable {
   private void initializeDiskStore(InternalCache cache) throws IOException {
     DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
     diskStoreAttributes.name = "clientQueueDS";
-    diskStoreAttributes.diskDirs = new File[]{tempDir.newFolder(name + "_dir")};
+    diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")};
     cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
   }
 
   private void initializeReplicateRegion(InternalCache cache) {
     cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
-        .setSubscriptionAttributes(new SubscriptionAttributes(
-            InterestPolicy.ALL)).create("subscriptionRegion");
+        .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL))
+        .create("subscriptionRegion");
   }
 
   private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
       throws IOException {
     CacheServer cacheServer1 = cache.addCacheServer(false);
-    ClientSubscriptionConfig clientSubscriptionConfig =
-        cacheServer1.getClientSubscriptionConfig();
+    ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig();
     clientSubscriptionConfig.setEvictionPolicy("entry");
     clientSubscriptionConfig.setCapacity(5);
     clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
index dc42da8..b65bf86 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
@@ -63,6 +63,7 @@ public class CacheRule extends DistributedExternalResource {
   private final boolean disconnectAfter;
   private final List<VM> createCacheInVMs;
   private final Properties config;
+  private final Properties systemProperties;
 
   public static Builder builder() {
     return new Builder();
@@ -74,18 +75,19 @@ public class CacheRule extends DistributedExternalResource {
     this.disconnectAfter = builder.disconnectAfter;
     this.createCacheInVMs = builder.createCacheInVMs;
     this.config = builder.config;
+    this.systemProperties = builder.systemProperties;
   }
 
   @Override
   protected void before() {
     if (createCacheInAll) {
-      invoker().invokeInEveryVMAndController(() -> createCache(config));
+      invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties));
     } else {
       if (createCache) {
-        createCache(config);
+        createCache(config, systemProperties);
       }
       for (VM vm : createCacheInVMs) {
-        vm.invoke(() -> createCache(config));
+        vm.invoke(() -> createCache(config, systemProperties));
       }
     }
   }
@@ -108,7 +110,8 @@ public class CacheRule extends DistributedExternalResource {
     return cache.getInternalDistributedSystem();
   }
 
-  private static void createCache(final Properties config) {
+  private static void createCache(final Properties config, final Properties systemProperties) {
+    System.getProperties().putAll(systemProperties);
     cache = (InternalCache) new CacheFactory(config).create();
   }
 
@@ -141,6 +144,7 @@ public class CacheRule extends DistributedExternalResource {
     private boolean disconnectAfter;
     private List<VM> createCacheInVMs = new ArrayList<>();
     private Properties config = new Properties();
+    private Properties systemProperties = new Properties();
 
     public Builder() {
       config.setProperty(LOCATORS, getLocators());
@@ -195,6 +199,16 @@ public class CacheRule extends DistributedExternalResource {
       return this;
     }
 
+    public Builder addSystemProperty(final String key, final String value) {
+      this.systemProperties.put(key, value);
+      return this;
+    }
+
+    public Builder addSystemProperties(final Properties config) {
+      this.systemProperties.putAll(config);
+      return this;
+    }
+
     public CacheRule build() {
       return new CacheRule(this);
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.