You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2022/04/28 22:10:31 UTC

[geode] branch develop updated: Bugfix/GEODE-10228 DurableClientTestCase.testDurableHAFailover is failing (#7608)

This is an automated email from the ASF dual-hosted git repository.

mhanson pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5f1d358db0 Bugfix/GEODE-10228 DurableClientTestCase.testDurableHAFailover is failing (#7608)
5f1d358db0 is described below

commit 5f1d358db0da813cc4df71906bf2977a87c0c7f4
Author: mhansonp <ha...@vmware.com>
AuthorDate: Thu Apr 28 15:10:24 2022 -0700

    Bugfix/GEODE-10228 DurableClientTestCase.testDurableHAFailover is failing (#7608)
    
    
    - The test was failing because it didn't wait for the
    HARegionQueue to clear before shutting down the durable
    client for test. Thus when it came back up, there was
    an extra message in the queue.
    
    - Reverse the order of readyforevents and registerinterest
    
    - adding a close for the control listener
    
    - Starting the server is not synchronous adjusted test accordingly
---
 .../ClientServerRegisterInterestsDUnitTest.java    | 169 ++++----
 .../sockets/DurableClientBug39997DUnitTest.java    | 138 -------
 ...bleClientNoServerAvailabileDistributedTest.java |  98 +++++
 .../tier/sockets/DurableClientStatsDUnitTest.java  | 232 +++++------
 .../DurableRegistrationDistributedTest.java        |  13 +-
 .../tier/sockets/DurableClientCQDUnitTest.java     | 315 +++++++--------
 .../sockets/DurableClientHAQueuedDUnitTest.java    |  20 +-
 .../tier/sockets/DurableClientSimpleDUnitTest.java | 425 +++++++--------------
 .../cache/tier/sockets/DurableClientTestBase.java  | 314 +++++++--------
 .../cache/tier/sockets/DurableClientTestCase.java  | 254 +++++++-----
 .../cache/tier/sockets/CacheServerTestUtil.java    |  81 ++--
 11 files changed, 874 insertions(+), 1185 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/ClientServerRegisterInterestsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/ClientServerRegisterInterestsDUnitTest.java
index ec97f6b7e9..0f484cab1d 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/ClientServerRegisterInterestsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/ClientServerRegisterInterestsDUnitTest.java
@@ -19,11 +19,8 @@ import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIEN
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -45,10 +42,7 @@ import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.server.ClientSubscriptionConfig;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
@@ -68,89 +62,78 @@ public class ClientServerRegisterInterestsDUnitTest extends JUnit4DistributedTes
 
   private final AtomicInteger serverPort = new AtomicInteger(0);
 
-  private final Stack entryEvents = new Stack();
+  private final Stack<EntryEvent<String, String>> entryEvents = new Stack<>();
 
   private VM gemfireServerVm;
 
   @Override
-  public final void postSetUp() throws Exception {
+  public final void postSetUp() {
     disconnectAllFromDS();
     setupGemFireCacheServer();
     IgnoredException.addIgnoredException("java.net.ConnectException");
   }
 
   @Override
-  public final void preTearDown() throws Exception {
+  public final void preTearDown() {
     serverPort.set(0);
     entryEvents.clear();
-    gemfireServerVm.invoke(new SerializableRunnable() {
-      @Override
-      public void run() {
-        CacheFactory.getAnyInstance().close();
-      }
-    });
+    gemfireServerVm.invoke(() -> CacheFactory.getAnyInstance().close());
     gemfireServerVm = null;
   }
 
   private void setupGemFireCacheServer() {
-    Host localhost = Host.getHost(0);
 
-    gemfireServerVm = localhost.getVM(0);
+
+    gemfireServerVm = VM.getVM(0);
     serverPort.set(AvailablePortHelper.getRandomAvailableTCPPort());
 
-    gemfireServerVm.invoke(new SerializableRunnable() {
-      @Override
-      public void run() {
-        try {
-          Cache cache = new CacheFactory()
-              .set("name", "ClientServerRegisterInterestsTestGemFireServer").set(MCAST_PORT, "0")
-              .set(LOG_FILE, "clientServerRegisterInterestsTest.log").set(LOG_LEVEL, "config")
-              // .set("jmx-manager", "true")
-              // .set("jmx-manager-http-port", "0")
-              // .set("jmx-manager-port", "1199")
-              // .set("jmx-manager-start", "true")
-              .create();
+    gemfireServerVm.invoke(() -> {
+      try {
+        Cache cache = new CacheFactory()
+            .set("name", "ClientServerRegisterInterestsTestGemFireServer").set(MCAST_PORT, "0")
+            .set(LOG_FILE, "clientServerRegisterInterestsTest.log").set(LOG_LEVEL, "config")
+            .create();
 
-          RegionFactory<String, String> regionFactory = cache.createRegionFactory();
+        RegionFactory<String, String> regionFactory = cache.createRegionFactory();
 
-          regionFactory.setDataPolicy(DataPolicy.REPLICATE);
-          regionFactory.setKeyConstraint(String.class);
-          regionFactory.setValueConstraint(String.class);
+        regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+        regionFactory.setKeyConstraint(String.class);
+        regionFactory.setValueConstraint(String.class);
 
-          Region<String, String> example = regionFactory.create("Example");
+        Region<String, String> example = regionFactory.create("Example");
 
-          assertNotNull("The 'Example' Region was not properly configured and initialized!",
-              example);
-          assertEquals(SEPARATOR + "Example", example.getFullPath());
-          assertEquals("Example", example.getName());
-          assertTrue(example.isEmpty());
+        assertThat(example)
+            .describedAs("The 'Example' Region was not properly configured and initialized!")
+            .isNotNull();
+        assertThat(example.getFullPath()).isEqualTo(SEPARATOR + "Example");
+        assertThat(example.getName()).isEqualTo("Example");
+        assertThat(example).isEmpty();
 
-          example.put("1", "ONE");
+        example.put("1", "ONE");
 
-          assertFalse(example.isEmpty());
-          assertEquals(1, example.size());
+        assertThat(example).isNotEmpty();
+        assertThat(example).hasSize(1);
 
-          CacheServer cacheServer = cache.addCacheServer();
+        CacheServer cacheServer = cache.addCacheServer();
 
-          cacheServer.setPort(serverPort.get());
-          cacheServer.setMaxConnections(10);
+        cacheServer.setPort(serverPort.get());
+        cacheServer.setMaxConnections(10);
 
-          ClientSubscriptionConfig clientSubscriptionConfig =
-              cacheServer.getClientSubscriptionConfig();
+        ClientSubscriptionConfig clientSubscriptionConfig =
+            cacheServer.getClientSubscriptionConfig();
 
-          clientSubscriptionConfig.setCapacity(100);
-          clientSubscriptionConfig.setEvictionPolicy("entry");
+        clientSubscriptionConfig.setCapacity(100);
+        clientSubscriptionConfig.setEvictionPolicy("entry");
 
-          cacheServer.start();
+        cacheServer.start();
 
-          assertTrue("Cache Server is not running!", cacheServer.isRunning());
-        } catch (UnknownHostException e) {
-          throw new RuntimeException(e);
-        } catch (IOException e) {
-          throw new RuntimeException(String.format(
-              "Failed to start the GemFire Cache Server listening on port (%1$d) due to IO error!",
-              serverPort.get()), e);
-        }
+        assertThat(cacheServer.isRunning()).describedAs("Cache Server is not running!").isTrue();
+      } catch (UnknownHostException e) {
+        throw new RuntimeException(e);
+      } catch (IOException e) {
+        throw new RuntimeException(String.format(
+            "Failed to start the GemFire Cache Server listening on port (%1$d) due to IO error!",
+            serverPort.get()), e);
       }
     });
   }
@@ -169,7 +152,9 @@ public class ClientServerRegisterInterestsDUnitTest extends JUnit4DistributedTes
 
     Pool pool = poolFactory.create("serverConnectionPool");
 
-    assertNotNull("The 'serverConnectionPool' was not properly configured and initialized!", pool);
+    assertThat(pool)
+        .describedAs("The 'serverConnectionPool' was not properly configured and initialized!")
+        .isNotNull();
 
     ClientRegionFactory<String, String> regionFactory =
         clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
@@ -181,8 +166,9 @@ public class ClientServerRegisterInterestsDUnitTest extends JUnit4DistributedTes
 
     Region<String, String> exampleCachingProxy = regionFactory.create("Example");
 
-    assertNotNull("The 'Example' Client Region was not properly configured and initialized",
-        exampleCachingProxy);
+    assertThat(exampleCachingProxy)
+        .describedAs("The 'Example' Client Region was not properly configured and initialized")
+        .isNotNull();
 
     clientCache.readyForEvents();
 
@@ -191,25 +177,23 @@ public class ClientServerRegisterInterestsDUnitTest extends JUnit4DistributedTes
     return clientCache;
   }
 
-  @SuppressWarnings("unchecked")
-  protected <K, V> V put(final String regionName, final K key, final V value) {
-    return (V) gemfireServerVm.invoke(new SerializableCallable() {
-      @Override
-      public Object call() throws Exception {
-        Cache cache = CacheFactory.getAnyInstance();
-        cache.getRegion(regionName).put(key, value);
-        return cache.getRegion(regionName).get(key);
-      }
+  protected <V> V put() {
+    return (V) gemfireServerVm.invoke(() -> {
+      Cache cache = CacheFactory.getAnyInstance();
+      cache.getRegion("/Example").put("2", "TWO");
+      return cache.getRegion("/Example").get("2");
     });
   }
 
-  protected void waitOnEvent(final long waitTimeMilliseconds) {
-    final long timeout = (System.currentTimeMillis() + waitTimeMilliseconds);
+  protected void waitOnEvent() {
+    final long timeout = (System.currentTimeMillis()
+        + ClientServerRegisterInterestsDUnitTest.WAIT_TIME_MILLISECONDS);
 
     while (entryEvents.empty() && (System.currentTimeMillis() < timeout)) {
       synchronized (this) {
         try {
-          TimeUnit.MILLISECONDS.timedWait(this, Math.min(500, waitTimeMilliseconds));
+          TimeUnit.MILLISECONDS.timedWait(this, Math.min(500,
+              ClientServerRegisterInterestsDUnitTest.WAIT_TIME_MILLISECONDS));
         } catch (InterruptedException ignore) {
         }
       }
@@ -218,35 +202,30 @@ public class ClientServerRegisterInterestsDUnitTest extends JUnit4DistributedTes
 
   @Test
   public void testClientRegisterInterests() {
-    ClientCache clientCache = setupGemFireClientCache();
 
-    try {
+    try (ClientCache clientCache = setupGemFireClientCache()) {
       Region<String, String> example = clientCache.getRegion(SEPARATOR + "Example");
 
-      assertNotNull("'Example' Region in Client Cache was not found!", example);
-      assertEquals(1, example.size());
-      assertTrue(example.containsKey("1"));
-      assertEquals("ONE", example.get("1"));
-      assertTrue(entryEvents.empty());
+      assertThat(example).describedAs("'Example' Region in Client Cache was not found!")
+          .isNotNull();
+      assertThat(example).containsOnly(entry("1", "ONE"));
+      assertThat(entryEvents).isEmpty();
 
-      String value = put(SEPARATOR + "Example", "2", "TWO");
+      String value = put();
 
-      assertEquals("TWO", value);
+      assertThat(value).isEqualTo("TWO");
 
-      waitOnEvent(WAIT_TIME_MILLISECONDS);
+      waitOnEvent();
 
-      assertFalse(entryEvents.empty());
+      assertThat(entryEvents).isNotEmpty();
 
-      EntryEvent entryEvent = (EntryEvent) entryEvents.pop();
+      EntryEvent<String, String> entryEvent = entryEvents.pop();
 
-      assertEquals("2", entryEvent.getKey());
-      assertEquals("TWO", entryEvent.getNewValue());
-      assertNull(entryEvent.getOldValue());
-      assertEquals(2, example.size());
-      assertTrue(example.containsKey("2"));
-      assertEquals("TWO", example.get("2"));
-    } finally {
-      clientCache.close();
+      assertThat(entryEvent.getKey()).isEqualTo("2");
+      assertThat(entryEvent.getNewValue()).isEqualTo("TWO");
+      assertThat(entryEvent.getOldValue()).isNull();
+      assertThat(example).hasSize(2);
+      assertThat(example).contains(entry("2", "TWO"));
     }
   }
 
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientBug39997DUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientBug39997DUnitTest.java
deleted file mode 100644
index f5a94ba3fd..0000000000
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientBug39997DUnitTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.tier.sockets;
-
-import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.NoSubscriptionServersAvailableException;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.cache.client.PoolManager;
-import org.apache.geode.cache.client.internal.PoolImpl;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.NetworkUtils;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-
-@Category({ClientSubscriptionTest.class})
-public class DurableClientBug39997DUnitTest extends JUnit4CacheTestCase {
-
-  @Override
-  public final void postTearDownCacheTestCase() {
-    Host.getHost(0).getVM(0).invoke(JUnit4DistributedTestCase::disconnectFromDS);
-  }
-
-  @Test
-  public void testNoServerAvailableOnStartup() {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-
-    final String hostName = NetworkUtils.getServerHostName(host);
-    final int port = AvailablePortHelper.getRandomAvailableTCPPort();
-    vm0.invoke(new SerializableRunnable("create cache") {
-      @Override
-      public void run() {
-        getSystem(getClientProperties());
-        PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(hostName, port)
-            .setSubscriptionEnabled(true).setSubscriptionRedundancy(0)
-            .create("DurableClientReconnectDUnitTestPool");
-        AttributesFactory factory = new AttributesFactory();
-        factory.setScope(Scope.LOCAL);
-        factory.setPoolName(p.getName());
-        Cache cache = getCache();
-        Region region1 = cache.createRegion("region", factory.create());
-        cache.readyForEvents();
-
-        try {
-          region1.registerInterest("ALL_KEYS");
-          fail("Should have received an exception trying to register interest");
-        } catch (NoSubscriptionServersAvailableException expected) {
-          // this is expected
-        }
-      }
-    });
-
-    vm1.invoke(new SerializableRunnable() {
-      @Override
-      public void run() {
-        Cache cache = getCache();
-        AttributesFactory factory = new AttributesFactory();
-        factory.setScope(Scope.DISTRIBUTED_ACK);
-        cache.createRegion("region", factory.create());
-        CacheServer server = cache.addCacheServer();
-        server.setPort(port);
-        try {
-          server.start();
-        } catch (IOException e) {
-          Assert.fail("couldn't start server", e);
-        }
-      }
-    });
-
-    vm0.invoke(new SerializableRunnable() {
-      @Override
-      public void run() {
-        Cache cache = getCache();
-        final Region region = cache.getRegion("region");
-        GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
-
-          @Override
-          public String description() {
-            return "Wait for register interest to succeed";
-          }
-
-          @Override
-          public boolean done() {
-            try {
-              region.registerInterest("ALL_KEYS");
-            } catch (NoSubscriptionServersAvailableException e) {
-              return false;
-            }
-            return true;
-          }
-
-        });
-      }
-    });
-  }
-
-  public Properties getClientProperties() {
-    Properties props = new Properties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(LOCATORS, "");
-    props.setProperty(DURABLE_CLIENT_ID, "my_id");
-    return props;
-  }
-}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientNoServerAvailabileDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientNoServerAvailabileDistributedTest.java
new file mode 100644
index 0000000000..dafacd9de4
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientNoServerAvailabileDistributedTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Properties;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.NoSubscriptionServersAvailableException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category({ClientSubscriptionTest.class})
+public class DurableClientNoServerAvailabileDistributedTest extends JUnit4CacheTestCase {
+
+  @Override
+  public final void postTearDownCacheTestCase() {
+    VM.getVM(0).invoke(JUnit4DistributedTestCase::disconnectFromDS);
+  }
+
+  @Test
+  public void testNoServerAvailableOnStartup() {
+    VM vm0 = VM.getVM(0);
+    VM vm1 = VM.getVM(1);
+
+    final String hostName = VM.getHostName();
+    final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+    vm0.invoke("create cache", () -> {
+      getSystem(getClientProperties());
+      PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(hostName, port)
+          .setSubscriptionEnabled(true).setSubscriptionRedundancy(0)
+          .create("DurableClientReconnectDUnitTestPool");
+      Cache cache = getCache();
+      RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
+      regionFactory.setScope(Scope.LOCAL);
+      regionFactory.setPoolName(p.getName());
+      Region<Object, Object> region1 = regionFactory.create("region");
+      assertThrows(NoSubscriptionServersAvailableException.class,
+          () -> region1.registerInterestForAllKeys());
+
+    });
+
+    vm1.invoke(() -> {
+      Cache cache = getCache();
+      RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      regionFactory.create("region");
+      CacheServer server = cache.addCacheServer();
+      server.setPort(port);
+      assertDoesNotThrow(server::start);
+    });
+
+    vm0.invoke(() -> {
+      Cache cache = getCache();
+      final Region<Object, Object> region = cache.getRegion("region");
+      GeodeAwaitility.await("Wait for register interest to succeed")
+          .untilAsserted(() -> assertDoesNotThrow(() -> region.registerInterestForAllKeys()));
+    });
+  }
+
+  public Properties getClientProperties() {
+    Properties props = new Properties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "");
+    props.setProperty(DURABLE_CLIENT_ID, "my_id");
+    return props;
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientStatsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientStatsDUnitTest.java
index e0b53dc768..a29aa40bb1 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientStatsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientStatsDUnitTest.java
@@ -18,9 +18,16 @@ import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIEN
 import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.test.dunit.Assert.assertEquals;
-import static org.apache.geode.test.dunit.Assert.assertNotNull;
-import static org.apache.geode.test.dunit.Assert.fail;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClient;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServer;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.disableShufflingOfEndpoints;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getClientCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
+import static org.assertj.core.api.Assertions.fail;
 
 import java.util.ArrayList;
 import java.util.Properties;
@@ -33,6 +40,7 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.PoolManager;
@@ -40,8 +48,6 @@ import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.PoolFactoryImpl;
 import org.apache.geode.internal.cache.tier.Acceptor;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.Wait;
@@ -69,62 +75,57 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
 
   private int PORT1;
 
-  private final String K1 = "Key1";
-
   @Override
-  public final void postSetUp() throws Exception {
-    Host host = Host.getHost(0);
-    server1VM = host.getVM(0);
-    durableClientVM = host.getVM(1);
+  public final void postSetUp() {
+    server1VM = VM.getVM(0);
+    durableClientVM = VM.getVM(1);
     regionName = DurableClientStatsDUnitTest.class.getName() + "_region";
-    CacheServerTestUtil.disableShufflingOfEndpoints();
+    disableShufflingOfEndpoints();
   }
 
   @Override
-  public final void preTearDown() throws Exception {
+  public final void preTearDown() {
     // Stop server 1
     server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-    CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
+    resetDisableShufflingOfEndpointsFlag();
   }
 
   @Test
   public void testNonDurableClientStatistics() {
     // Step 1: Starting the servers
     PORT1 = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
     server1VM.invoke(DurableClientStatsDUnitTest::checkStatistics);
     // Step 2: Bring Up the Client
     // Start a durable client that is not kept alive on the server when it
     // stops normally
-    final int durableClientTimeout = 600; // keep the client alive for 600
-    // seconds
 
-    startAndCloseNonDurableClientCache(durableClientTimeout);
-    startAndCloseNonDurableClientCache(1); //////// -> Reconnection1
+    startAndCloseNonDurableClientCache();
+    startAndCloseNonDurableClientCache(); //////// -> Reconnection1
     Wait.pause(1400); //////// -> Queue Dropped1
-    startAndCloseNonDurableClientCache(1);
+    startAndCloseNonDurableClientCache();
     Wait.pause(1400); //////// -> Queue Dropped2
 
-    startRegisterAndCloseNonDurableClientCache(durableClientTimeout);
+    startRegisterAndCloseNonDurableClientCache();
     Wait.pause(500);
 
-    server1VM.invoke(() -> DurableClientStatsDUnitTest.putValue(K1, "Value1")); //////// ->
-                                                                                //////// Enqueue
-                                                                                //////// Message1
+    server1VM.invoke(() -> DurableClientStatsDUnitTest.putValue("Value1")); //////// ->
+    //////// Enqueue
+    //////// Message1
 
     Wait.pause(500);
-    startAndCloseNonDurableClientCache(1); //////// -> Reconnection2
+    startAndCloseNonDurableClientCache(); //////// -> Reconnection2
     Wait.pause(1400); //////// -> Queue Dropped3
-    startAndCloseNonDurableClientCache(1);
+    startAndCloseNonDurableClientCache();
     Wait.pause(1400); //////// -> Queue Dropped4
-    startRegisterAndCloseNonDurableClientCache(durableClientTimeout);
+    startRegisterAndCloseNonDurableClientCache();
     Wait.pause(500);
 
-    server1VM.invoke(() -> DurableClientStatsDUnitTest.putValue(K1, "NewValue1")); //////// ->
-                                                                                   //////// Enqueue
-                                                                                   //////// Message2
+    server1VM.invoke(() -> DurableClientStatsDUnitTest.putValue("NewValue1")); //////// ->
+    //////// Enqueue
+    //////// Message2
 
-    startAndCloseNonDurableClientCache(durableClientTimeout); //////// -> Reconnection3
+    startAndCloseNonDurableClientCache(); //////// -> Reconnection3
 
     server1VM.invoke(() -> DurableClientStatsDUnitTest
         .checkStatisticsWithExpectedValues(0, 0, 0));
@@ -133,9 +134,10 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
   @Test
   public void testDurableClientStatistics() {
 
+    assertThat(server1VM).isNotNull();
     // Step 1: Starting the servers
     PORT1 = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
     server1VM.invoke(DurableClientStatsDUnitTest::checkStatistics);
     // Step 2: Bring Up the Client
     // Start a durable client that is not kept alive on the server when it
@@ -152,9 +154,9 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
     startRegisterAndCloseDurableClientCache(durableClientTimeout);
     Wait.pause(500);
 
-    server1VM.invoke(() -> DurableClientStatsDUnitTest.putValue(K1, "Value1")); //////// ->
-                                                                                //////// Enqueue
-                                                                                //////// Message1
+    server1VM.invoke(() -> DurableClientStatsDUnitTest.putValue("Value1")); //////// ->
+    //////// Enqueue
+    //////// Message1
 
     Wait.pause(500);
     startAndCloseDurableClientCache(1); //////// -> Reconnection2
@@ -164,9 +166,9 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
     startRegisterAndCloseDurableClientCache(durableClientTimeout);
     Wait.pause(500);
 
-    server1VM.invoke(() -> DurableClientStatsDUnitTest.putValue(K1, "NewValue1")); //////// ->
-                                                                                   //////// Enqueue
-                                                                                   //////// Message2
+    server1VM.invoke(() -> DurableClientStatsDUnitTest.putValue("NewValue1")); //////// ->
+    //////// Enqueue
+    //////// Message2
 
     startAndCloseDurableClientCache(durableClientTimeout); //////// -> Reconnection3
 
@@ -177,49 +179,33 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
   public void startRegisterAndCloseDurableClientCache(int durableClientTimeout) {
     final String durableClientId = getName() + "_client";
 
-    durableClientVM
-        .invoke(() -> CacheServerTestUtil.createCacheClient(
-            getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, true,
-                0),
-            regionName,
-            getDurableClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-            Boolean.TRUE));
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(NetworkUtils.getServerHostName(), PORT1), regionName,
+        getDurableClientDistributedSystemProperties(durableClientId, durableClientTimeout),
+        true));
+
+    durableClientVM.invoke(() -> DurableClientStatsDUnitTest.registerKey(true));
 
     // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
+        getClientCache().readyForEvents();
       }
     });
 
-    durableClientVM
-        .invoke(() -> DurableClientStatsDUnitTest.registerKey(K1, Boolean.TRUE));
-
     durableClientVM.invoke(DurableClientStatsDUnitTest::closeCache);
   }
 
-  public void startRegisterAndCloseNonDurableClientCache(int durableClientTimeout) {
-    final String durableClientId = getName() + "_client";
+  public void startRegisterAndCloseNonDurableClientCache() {
 
     durableClientVM
-        .invoke(() -> CacheServerTestUtil.createCacheClient(
-            getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, true,
-                0),
+        .invoke(() -> createCacheClient(
+            getClientPool(NetworkUtils.getServerHostName(), PORT1),
             regionName,
-            getNonDurableClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-            Boolean.TRUE));
+            getNonDurableClientDistributedSystemProperties(), true));
 
-    // Send clientReady message
-    // this.durableClientVM.invoke(new CacheSerializableRunnable(
-    // "Send clientReady") {
-    // public void run2() throws CacheException {
-    // CacheServerTestUtil.getCache().readyForEvents();
-    // }
-    // });
-
-    durableClientVM
-        .invoke(() -> DurableClientStatsDUnitTest.registerKey(K1, Boolean.FALSE));
+    durableClientVM.invoke(() -> DurableClientStatsDUnitTest.registerKey(false));
 
     durableClientVM.invoke(DurableClientStatsDUnitTest::closeCache);
   }
@@ -229,18 +215,17 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
     final String durableClientId = getName() + "_client";
 
     durableClientVM
-        .invoke(() -> CacheServerTestUtil.createCacheClient(
-            getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, true,
-                0),
+        .invoke(() -> createCacheClient(
+            getClientPool(NetworkUtils.getServerHostName(), PORT1),
             regionName,
             getDurableClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-            Boolean.TRUE));
+            true));
 
     // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
+        getClientCache().readyForEvents();
       }
     });
 
@@ -248,25 +233,11 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
 
   }
 
-  public void startAndCloseNonDurableClientCache(int durableClientTimeout) {
+  public void startAndCloseNonDurableClientCache() {
 
-    final String durableClientId = getName() + "_client";
-
-    durableClientVM
-        .invoke(() -> CacheServerTestUtil.createCacheClient(
-            getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, true,
-                0),
-            regionName,
-            getNonDurableClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-            Boolean.TRUE));
-
-    // Send clientReady message
-    // this.durableClientVM.invoke(new CacheSerializableRunnable(
-    // "Send clientReady") {
-    // public void run2() throws CacheException {
-    // CacheServerTestUtil.getCache().readyForEvents();
-    // }
-    // });
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(NetworkUtils.getServerHostName(), PORT1), regionName,
+        getNonDurableClientDistributedSystemProperties(), true));
 
     durableClientVM.invoke(DurableClientStatsDUnitTest::closeCache);
 
@@ -274,10 +245,10 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
 
   public static void checkStatistics() {
     try {
-      Cache cache = CacheServerTestUtil.getCache();
+      Cache cache = getCache();
       org.apache.geode.LogWriter logger = cache.getLogger();
       CacheServerImpl currentServer =
-          (CacheServerImpl) (new ArrayList(cache.getCacheServers()).get(0));
+          (CacheServerImpl) (new ArrayList<>(cache.getCacheServers()).get(0));
       Acceptor ai = currentServer.getAcceptor();
       CacheClientNotifier notifier = ai.getCacheClientNotifier();
       CacheClientNotifierStats stats = notifier.getStats();
@@ -292,71 +263,63 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
   public static void checkStatisticsWithExpectedValues(int reconnectionCount, int queueDropCount,
       int enqueueCount) {
     try {
-      Cache cache = CacheServerTestUtil.getCache();
+      Cache cache = getCache();
       org.apache.geode.LogWriter logger = cache.getLogger();
       CacheServerImpl currentServer =
-          (CacheServerImpl) (new ArrayList(cache.getCacheServers()).get(0));
+          (CacheServerImpl) (new ArrayList<>(cache.getCacheServers()).get(0));
       Acceptor ai = currentServer.getAcceptor();
       CacheClientNotifier notifier = ai.getCacheClientNotifier();
       CacheClientNotifierStats stats = notifier.getStats();
       logger.info("Stats:" + "\nDurableReconnectionCount:" + stats.get_durableReconnectionCount()
           + "\nQueueDroppedCount" + stats.get_queueDroppedCount()
           + "\nEventsEnqueuedWhileClientAwayCount" + stats.get_eventEnqueuedWhileClientAwayCount());
-      assertEquals(reconnectionCount, stats.get_durableReconnectionCount());
-      assertEquals(queueDropCount, stats.get_queueDroppedCount());
-      assertEquals(enqueueCount, stats.get_eventEnqueuedWhileClientAwayCount());
+      await().untilAsserted(
+          () -> assertThat(stats.get_durableReconnectionCount()).isEqualTo(reconnectionCount));
+      await()
+          .untilAsserted(() -> assertThat(stats.get_queueDroppedCount()).isEqualTo(queueDropCount));
+      await().untilAsserted(
+          () -> assertThat(stats.get_eventEnqueuedWhileClientAwayCount()).isEqualTo(enqueueCount));
     } catch (Exception e) {
       fail("Exception thrown while executing checkStatisticsWithExpectedValues()", e);
     }
   }
 
   public static void closeCache() {
-    Cache cache = CacheServerTestUtil.getCache();
-    if (cache != null && !cache.isClosed()) {
+    ClientCache clientCache = getClientCache();
+    if (clientCache != null && !clientCache.isClosed()) {
       // might fail in DataSerializerRecoveryListener.RecoveryTask in shutdown
-      cache.getLogger().info("<ExpectedException action=add>"
+      clientCache.getLogger().info("<ExpectedException action=add>"
           + RejectedExecutionException.class.getName() + "</ExpectedException>");
-      cache.close(true);
-      cache.getDistributedSystem().disconnect();
+      clientCache.close(true);
     }
   }
 
-  private static void registerKey(String key, boolean isDurable) throws Exception {
-    try {
-      // Get the region
-      Region region = CacheServerTestUtil.getCache()
-          .getRegion(DurableClientStatsDUnitTest.class.getName() + "_region");
-      // Region region =
-      // CacheServerTestUtil.getCache().getRegion(regionName);
-      assertNotNull(region);
-      region.registerInterest(key, InterestResultPolicy.NONE, isDurable);
-    } catch (Exception ex) {
-      Assert.fail("failed while registering interest in registerKey function", ex);
-    }
+  private static void registerKey(boolean isDurable) {
+    // Get the region
+    Region<String, String> region = getClientCache()
+        .getRegion(DurableClientStatsDUnitTest.class.getName() + "_region");
+
+    assertThat(region).isNotNull();
+    region.registerInterest("Key1", InterestResultPolicy.NONE, isDurable);
   }
 
-  private static void putValue(String key, String value) {
-    try {
-      Region r = CacheServerTestUtil.getCache()
-          .getRegion(DurableClientStatsDUnitTest.class.getName() + "_region");
-      // Region r = CacheServerTestUtil.getCache().getRegion(regionName);
-      assertNotNull(r);
-      if (r.getEntry(key) != null) {
-        r.put(key, value);
-      } else {
-        r.create(key, value);
-      }
-      assertEquals(value, r.getEntry(key).getValue());
-    } catch (Exception e) {
-      fail("Put in Server has some fight", e);
+  private static void putValue(String value) {
+    Region<String, String> r = getClientCache()
+        .getRegion(DurableClientStatsDUnitTest.class.getName() + "_region");
+    assertThat(r).isNotNull();
+
+    if (r.getEntry("Key1") != null) {
+      r.put("Key1", value);
+    } else {
+      r.create("Key1", value);
     }
+    assertThat(r).contains(entry("Key1", value));
   }
 
-  private Pool getClientPool(String host, int server1Port, boolean establishCallbackConnection,
-      int redundancyLevel) {
+  private Pool getClientPool(String host, int server1Port) {
     PoolFactory pf = PoolManager.createFactory();
-    pf.addServer(host, server1Port).setSubscriptionEnabled(establishCallbackConnection)
-        .setSubscriptionRedundancy(redundancyLevel);
+    pf.addServer(host, server1Port).setSubscriptionEnabled(true)
+        .setSubscriptionRedundancy(0);
     return ((PoolFactoryImpl) pf).getPoolAttributes();
   }
 
@@ -370,8 +333,7 @@ public class DurableClientStatsDUnitTest extends JUnit4DistributedTestCase {
     return properties;
   }
 
-  private Properties getNonDurableClientDistributedSystemProperties(String durableClientId,
-      int durableClientTimeout) {
+  private Properties getNonDurableClientDistributedSystemProperties() {
     Properties properties = new Properties();
     properties.setProperty(MCAST_PORT, "0");
     properties.setProperty(LOCATORS, "");
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
index 627403fc38..43c0764ac2 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
@@ -462,6 +462,12 @@ public class DurableRegistrationDistributedTest extends JUnit4DistributedTestCas
             1),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)));
 
+    // Step 3: Client registers Interests
+    durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+    durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
     // Send clientReady message
     durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() {
       @Override
@@ -469,13 +475,6 @@ public class DurableRegistrationDistributedTest extends JUnit4DistributedTestCas
         getClientCache().readyForEvents();
       }
     });
-
-    // Step 3: Client registers Interests
-    durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
-    durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
-    durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
-    durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
-
     // Close Cache of the DurableClient
     durableClientVM.invoke(this::closeCache);
     GeodeAwaitility.await().until(() -> durableClientVM.invoke(() -> getClientCache().isClosed()));
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
index 71aedefdf1..3cb722fb7c 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java
@@ -16,14 +16,17 @@ package org.apache.geode.internal.cache.tier.sockets;
 
 
 import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.ControlCqListener;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClient;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClientFromXml;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServer;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServerFromXml;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getClientCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getPool;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 import java.util.Map;
@@ -50,15 +53,11 @@ import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
-import org.apache.geode.internal.cache.ClientServerObserverAdapter;
-import org.apache.geode.internal.cache.ClientServerObserverHolder;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 
 @Category({ClientSubscriptionTest.class})
@@ -74,8 +73,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     String lessThan5Query = "select * from " + SEPARATOR + regionName + " p where p.ID < 5";
 
     // Start a server
-    server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+    server1Port = server1VM.invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client that is kept alive on the server when it stops
     // normally
@@ -99,9 +97,9 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
-    server1VM.invoke(new CacheSerializableRunnable("Close cq for durable client") {
+    server1VM.invoke("Close cq for durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
 
@@ -169,17 +167,16 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
 
     // Start a server 1
     server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start server 2
-    server2Port = server2VM.invoke(CacheServerTestUtil.class,
-        "createCacheServer", new Object[] {regionName, Boolean.TRUE});
+    server2Port = server2VM.invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client that is kept alive on the server when it stops normally
     durableClientId = getName() + "_client";
-    CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(), server1Port, server2Port, true, 0),
-        regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE);
+    createCacheClient(
+        getClientPool(VM.getHostName(), server1Port, server2Port, true, 0),
+        regionName, getClientDistributedSystemProperties(durableClientId), true);
 
     // register non durable cq
     createCq("GreaterThan5", greaterThan5Query, false).execute();
@@ -189,7 +186,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     createCq("LessThan5", lessThan5Query, true).execute();
 
     // send client ready
-    CacheServerTestUtil.getClientCache().readyForEvents();
+    getClientCache().readyForEvents();
 
     int oldPrimaryPort = getPrimaryServerPort();
     // Close the server that is hosting subscription queue
@@ -203,7 +200,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     // Wait until failover to the another server is successfully performed
     waitForFailoverToPerform(oldPrimaryPort);
     primary = getPrimaryServerVM();
-    waitForDurableClientPresence(durableClientId, primary, 1);
+    waitForDurableClientPresence(durableClientId, primary);
     int primaryPort = getPrimaryServerPort();
 
     // Stop the durable client
@@ -213,13 +210,13 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, primaryPort, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // Restart the durable client
-    CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(), primaryPort, true),
-        regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE);
-    assertThat(CacheServerTestUtil.getClientCache()).isNotNull();
+    createCacheClient(
+        getClientPool(VM.getHostName(), primaryPort, true),
+        regionName, getClientDistributedSystemProperties(durableClientId), true);
+    assertThat(getClientCache()).isNotNull();
 
     // Re-register non durable cq
     createCq("GreaterThan5", greaterThan5Query, false).execute();
@@ -229,7 +226,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     createCq("LessThan5", lessThan5Query, true).execute();
 
     // send client ready
-    CacheServerTestUtil.getClientCache().readyForEvents();
+    getClientCache().readyForEvents();
 
     // verify cq events for all 3 cqs
     checkCqListenerEvents("GreaterThan5", 0 /* numEventsExpected */,
@@ -261,7 +258,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
 
     // Start server 1
     server1Port = server1VM.invoke(
-        () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        () -> createCacheServer(regionName, true));
 
     final String durableClientId = getName() + "_client";
 
@@ -285,7 +282,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     closeCQsforDurableClient(durableClientId);
 
@@ -336,7 +333,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
 
     // Start server 1
     server1Port = server1VM.invoke(
-        () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        () -> createCacheServer(regionName, true));
 
     durableClientId = getName() + "_client";
 
@@ -359,7 +356,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     closeCQsforDurableClient(durableClientId);
 
@@ -395,7 +392,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     checkHAQueueSize(server1VM, durableClientId, 0, 1);
 
     // continue to publish and make sure we get the events
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
     checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
         /* numEventsToWaitFor */ 10/* secondsToWait */);
     checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
@@ -424,7 +421,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
   }
 
   private void closeCQsforDurableClient(String durableClientId) {
-    server1VM.invoke(new CacheSerializableRunnable("Close cq for durable client") {
+    server1VM.invoke("Close cq for durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
 
@@ -453,7 +450,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
 
     // Start server 1
     server1Port = server1VM.invoke(
-        () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        () -> createCacheServer(regionName, true));
 
     final String durableClientId = getName() + "_client";
     final String durableClientId2 = getName() + "_client2";
@@ -481,7 +478,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     sendClientReady(durableClientVM);
 
     // Verify 2nd durable client on server
-    server1VM.invoke(new CacheSerializableRunnable("Verify 2nd durable client") {
+    server1VM.invoke("Verify 2nd durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Find the proxy
@@ -495,9 +492,9 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
-    server1VM.invoke(new CacheSerializableRunnable("Close cq for durable client 1") {
+    server1VM.invoke("Close cq for durable client 1", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
 
@@ -587,7 +584,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
 
       // Start server 1
       server1Port = server1VM.invoke(
-          () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+          () -> createCacheServer(regionName, true));
 
       final String durableClientId = getName() + "_client";
       durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
@@ -611,9 +608,9 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
       startClient(publisherClientVM, server1Port, regionName);
 
       // Publish some entries
-      publishEntries(regionName, 10);
+      publishEntries(publisherClientVM, regionName, 10);
 
-      server1VM.invoke(new CacheSerializableRunnable("Set test hook") {
+      server1VM.invoke("Set test hook", new CacheSerializableRunnable() {
         @Override
         public void run2() throws CacheException {
           // Set the Test Hook!
@@ -622,7 +619,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
         }
       });
 
-      server1VM.invokeAsync(new CacheSerializableRunnable("Close cq for durable client") {
+      server1VM.invokeAsync("Close cq for durable client", new CacheSerializableRunnable() {
         @Override
         public void run2() throws CacheException {
 
@@ -639,15 +636,16 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
       // Restart the durable client
       startDurableClient(durableClientVM, durableClientId, server1Port, regionName);
 
-      server1VM.invoke(new CacheSerializableRunnable("verify was rejected at least once") {
+      server1VM.invoke("verify was rejected at least once", new CacheSerializableRunnable() {
         @Override
         public void run2() throws CacheException {
           await()
               .until(() -> CacheClientProxy.testHook != null
                   && (((RejectClientReconnectTestHook) CacheClientProxy.testHook)
                       .wasClientRejected()));
-          assertTrue(
-              ((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected());
+          assertThat(
+              ((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected())
+                  .isTrue();
         }
       });
 
@@ -684,7 +682,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
       // Stop the server
       server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
     } finally {
-      server1VM.invoke(new CacheSerializableRunnable("unset test hook") {
+      server1VM.invoke("unset test hook", new CacheSerializableRunnable() {
         @Override
         public void run2() throws CacheException {
           CacheClientProxy.testHook = null;
@@ -706,7 +704,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
 
       // Start server 1
       server1Port = server1VM.invoke(
-          () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+          () -> createCacheServer(regionName, true));
 
       durableClientId = getName() + "_client";
 
@@ -729,10 +727,10 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
       startClient(publisherClientVM, server1Port, regionName);
 
       // Publish some entries
-      publishEntries(regionName, 10);
+      publishEntries(publisherClientVM, regionName, 10);
 
-      AsyncInvocation async =
-          server1VM.invokeAsync(new CacheSerializableRunnable("Close cq for durable client") {
+      AsyncInvocation<Void> async =
+          server1VM.invokeAsync("Close cq for durable client", new CacheSerializableRunnable() {
             @Override
             public void run2() throws CacheException {
 
@@ -775,9 +773,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
       // send client ready
       sendClientReady(durableClientVM);
 
-      async.join();
-      assertFalse(async.getException() != null ? async.getException().toString() : "No error",
-          async.exceptionOccurred());
+      async.await();
 
       // verify cq listener events
       checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
@@ -796,7 +792,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
       // Stop the server
       server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
     } finally {
-      server1VM.invoke(new CacheSerializableRunnable("unset test hook") {
+      server1VM.invoke("unset test hook", new CacheSerializableRunnable() {
         @Override
         public void run2() throws CacheException {
           CacheClientProxy.testHook = null;
@@ -816,7 +812,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
 
     // Start a server
     server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client that is kept alive on the server when it stops
     // normally
@@ -836,10 +832,10 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // Attempt to close a cq even though the client is running
-    server1VM.invoke(new CacheSerializableRunnable("Close cq for durable client") {
+    server1VM.invoke("Close cq for durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
 
@@ -891,19 +887,19 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
 
     // Start server 1
     server1Port = server1VM.invoke(CacheServerTestUtil.class,
-        "createCacheServer", new Object[] {regionName, Boolean.TRUE});
+        "createCacheServer", new Object[] {regionName, true});
 
     // Start server 2
     final int server2Port = server2VM.invoke(CacheServerTestUtil.class,
-        "createCacheServer", new Object[] {regionName, Boolean.TRUE});
+        "createCacheServer", new Object[] {regionName, true});
 
     // Start a durable client
     durableClientId = getName() + "_client";
     durableClientVM.invoke(() -> {
-      CacheServerTestUtil
-          .createCacheClient(getClientPool(getServerHostName(), server1Port, server2Port,
-              true, 0),
-              regionName, getClientDistributedSystemProperties(durableClientId, 60), Boolean.TRUE);
+      createCacheClient(getClientPool(NetworkUtils.getServerHostName(),
+          server1Port, server2Port,
+          true, 0),
+          regionName, getClientDistributedSystemProperties(durableClientId, 60), true);
 
     });
 
@@ -929,19 +925,19 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
   }
 
   private static List<String> getAllDurableCqsFromServer() throws CqException {
-    QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+    QueryService queryService = getPool().getQueryService();
     return queryService.getAllDurableCqsFromServer();
   }
 
   private static void verifyDurableCqs(final List<String> durableCqNames,
       final String registeredCqName) {
     // Verify the number of durable CQ names is one, and it matches the registered name
-    assertEquals(1, durableCqNames.size());
+    assertThat(durableCqNames).hasSize(1);
     String returnedCqName = durableCqNames.get(0);
-    assertEquals(registeredCqName, returnedCqName);
+    assertThat(registeredCqName).isEqualTo(returnedCqName);
 
     // Get client's primary server
-    PoolImpl pool = CacheServerTestUtil.getPool();
+    PoolImpl pool = getPool();
     ServerLocation primaryServerLocation = pool.getPrimary();
 
     // Verify the primary server was used and no other server was used
@@ -950,7 +946,7 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
     for (Map.Entry<ServerLocationAndMemberId, ConnectionStats> entry : statistics.entrySet()) {
       int expectedGetDurableCqInvocations =
           entry.getKey().getServerLocation().equals(primaryServerLocation) ? 1 : 0;
-      assertEquals(expectedGetDurableCqInvocations, entry.getValue().getGetDurableCqs());
+      assertThat(entry.getValue().getGetDurableCqs()).isEqualTo(expectedGetDurableCqInvocations);
     }
   }
 
@@ -960,123 +956,92 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
    */
   @Test
   public void testReadyForEventsNotCalledImplicitlyWithCacheXML() throws InterruptedException {
-    try {
-      setPeriodicACKObserver(durableClientVM);
-      final String cqName = "cqTest";
-      // Start a server
-      server1Port =
-          server1VM.invoke(() -> CacheServerTestUtil.createCacheServerFromXml(
-              DurableClientTestBase.class.getResource("durablecq-server-cache.xml")));
-
-      // Start a durable client that is not kept alive on the server when it
-      // stops normally
-      final String durableClientId = getName() + "_client";
 
-      // create client cache from xml
-      durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClientFromXml(
-          DurableClientTestBase.class.getResource("durablecq-client-cache.xml"), "client",
-          durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.FALSE));
+    final String cqName = "cqTest";
+    // Start a server
+    server1Port =
+        server1VM.invoke(() -> createCacheServerFromXml(
+            DurableClientTestBase.class.getResource("durablecq-server-cache.xml")));
 
-      // verify that readyForEvents has not yet been called on all the client's pools
-      durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") {
-        @Override
-        public void run2() throws CacheException {
-          for (Pool p : PoolManager.getAll().values()) {
-            assertFalse(((PoolImpl) p).getReadyForEventsCalled());
-          }
-        }
-      });
+    // Start a durable client that is not kept alive on the server when it
+    // stops normally
+    final String durableClientId = getName() + "_client";
 
-      // Send clientReady message
-      sendClientReady(durableClientVM);
-      registerDurableCq(cqName);
+    // create client cache from xml
+    durableClientVM.invoke(() -> createCacheClientFromXml(
+        DurableClientTestBase.class.getResource("durablecq-client-cache.xml"), "client",
+        durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS, false));
 
-      // Verify durable client on server1
-      verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
+    // verify that readyForEvents has not yet been called on all the client's pools
+    durableClientVM.invoke("check readyForEvents not called", new CacheSerializableRunnable() {
+      @Override
+      public void run2() throws CacheException {
+        for (Pool p : PoolManager.getAll().values()) {
+          assertThat(((PoolImpl) p).getReadyForEventsCalled()).isFalse();
+        }
+      }
+    });
 
-      // Start normal publisher client
-      publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-          getClientPool(getServerHostName(), server1Port, false),
-          regionName));
+    // Send clientReady message
+    sendClientReady(durableClientVM);
+    registerDurableCq(cqName);
 
-      // Publish some entries
-      final int numberOfEntries = 10;
-      publishEntries(0, numberOfEntries);
+    // Verify durable client on server1
+    verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
 
-      // Verify the durable client received the updates
-      checkCqListenerEvents(durableClientVM, cqName, numberOfEntries,
-          VERY_LONG_DURABLE_TIMEOUT_SECONDS);
+    // Start normal publisher client
+    publisherClientVM.invoke(() -> createCacheClient(
+        getClientPool(publisherClientVM.getHost().getHostName(), server1Port,
+            false),
+        regionName));
 
-      Thread.sleep(10000);
+    // Publish some entries
+    final int numberOfEntries = 10;
+    publishEntries(publisherClientVM, 0, numberOfEntries);
 
-      // Stop the durable client
-      durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(Boolean.TRUE));
+    // Verify the durable client received the updates
+    checkCqListenerEvents(durableClientVM, cqName, numberOfEntries,
+        VERY_LONG_DURABLE_TIMEOUT_SECONDS);
 
-      // Verify the durable client still exists on the server
-      verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId,
-          server1VM);
+    Thread.sleep(10000);
 
-      // Publish some more entries
-      publishEntries(10, numberOfEntries);
+    // Stop the durable client
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(true));
 
-      publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
+    // Verify the durable client still exists on the server
+    verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId,
+        server1VM);
 
-      // Re-start the durable client
-      durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClientFromXml(
-          DurableClientTestBase.class.getResource("durablecq-client-cache.xml"), "client",
-          durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.FALSE));
+    // Publish some more entries
+    publishEntries(publisherClientVM, 10, numberOfEntries);
 
-      // Durable client registers durable cq on server
-      registerDurableCq(cqName);
+    publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
 
-      // Send clientReady message
-      sendClientReady(durableClientVM);
+    // Re-start the durable client
+    durableClientVM.invoke(() -> createCacheClientFromXml(
+        DurableClientTestBase.class.getResource("durablecq-client-cache.xml"), "client",
+        durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS, false));
 
-      // Verify durable client on server
-      verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
+    // Durable client registers durable cq on server
+    registerDurableCq(cqName);
 
-      // Verify the durable client received the updates held for it on the server
-      checkCqListenerEvents(durableClientVM, cqName, 10, VERY_LONG_DURABLE_TIMEOUT_SECONDS);
+    // Send clientReady message
+    sendClientReady(durableClientVM);
 
-      // Stop the durable client
-      durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
+    // Verify durable client on server
+    verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
 
-      // Stop the server
-      server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
-    } finally {
-      unsetPeriodicACKObserver(durableClientVM);
-    }
-  }
+    // Verify the durable client received the updates held for it on the server
+    checkCqListenerEvents(durableClientVM, cqName, 10, VERY_LONG_DURABLE_TIMEOUT_SECONDS);
 
-  private void setPeriodicACKObserver(VM vm) {
-    CacheSerializableRunnable cacheSerializableRunnable =
-        new CacheSerializableRunnable("Set ClientServerObserver") {
-          @Override
-          public void run2() throws CacheException {
-            PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = true;
-            ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
-              @Override
-              public void beforeSendingClientAck() {
-                // logger.info("beforeSendingClientAck invoked");
+    // Stop the durable client
+    durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
 
-              }
-            });
+    // Stop the server
+    server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
 
-          }
-        };
-    vm.invoke(cacheSerializableRunnable);
   }
 
-  private void unsetPeriodicACKObserver(VM vm) {
-    CacheSerializableRunnable cacheSerializableRunnable =
-        new CacheSerializableRunnable("Unset ClientServerObserver") {
-          @Override
-          public void run2() throws CacheException {
-            PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = false;
-          }
-        };
-    vm.invoke(cacheSerializableRunnable);
-  }
 
   public VM getPrimaryServerVM() {
     if (server1Port == getPrimaryServerPort()) {
@@ -1087,49 +1052,37 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase {
   }
 
   public int getPrimaryServerPort() {
-    PoolImpl pool = CacheServerTestUtil.getPool();
+    PoolImpl pool = getPool();
     ServerLocation primaryServerLocation = pool.getPrimary();
     return primaryServerLocation.getPort();
   }
 
   public void waitForFailoverToPerform(int oldPrimaryPort) {
-    final PoolImpl pool = CacheServerTestUtil.getPool();
-    WaitCriterion ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return pool.getPrimary() != null && pool.getPrimary().getPort() != oldPrimaryPort;
-      }
-
-      @Override
-      public String description() {
-        return null;
-      }
-    };
-
-    GeodeAwaitility.await().untilAsserted(ev);
-    assertNotNull(pool.getPrimary());
+    final PoolImpl pool = getPool();
+    await().until(() -> pool.getPrimary() != null && pool.getPrimary().getPort() != oldPrimaryPort);
+    assertThat(pool.getPrimary()).isNotNull();
   }
 
   void registerDurableCq(final String cqName) {
     // Durable client registers durable cq on server
-    durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") {
+    durableClientVM.invoke("Register Cq", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Get the region
-        Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
+        Region<Object, Object> region = getCache().getRegion(regionName);
+        assertThat(region).isNotNull();
 
         // Create CQ Attributes.
         CqAttributesFactory cqAf = new CqAttributesFactory();
 
         // Initialize and set CqListener.
-        CqListener[] cqListeners = {new CacheServerTestUtil.ControlCqListener()};
+        CqListener[] cqListeners = {new ControlCqListener()};
         cqAf.initCqListeners(cqListeners);
         CqAttributes cqa = cqAf.create();
 
         // Create cq's
         // Get the query service for the Pool
-        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+        QueryService queryService = getPool().getQueryService();
 
         try {
           CqQuery query =
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientHAQueuedDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientHAQueuedDUnitTest.java
index 3677e589af..fa24d3daf2 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientHAQueuedDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientHAQueuedDUnitTest.java
@@ -65,7 +65,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // verify cq stats are correct
     checkNumDurableCqs(server1VM, durableClientId, 3);
@@ -148,7 +148,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // verify cq stats are correct
     checkNumDurableCqs(server1VM, durableClientId, 3);
@@ -246,7 +246,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // Restart the durable client
     startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
@@ -338,7 +338,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // verify cq stats are correct
     checkNumDurableCqs(server1VM, durableClientId, 3);
@@ -429,7 +429,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // verify cq stats are correct on both servers
     checkNumDurableCqs(server1VM, durableClientId, 3);
@@ -538,7 +538,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // Re-start server2, should get events through gii
     server2VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
@@ -636,7 +636,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // Restart the durable client
     startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
@@ -747,7 +747,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // Re-start server2, should get events through gii
     server2VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
@@ -848,7 +848,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     closeCache(server1VM);
 
@@ -933,7 +933,7 @@ public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // verify cq stats are correct
     checkNumDurableCqs(server1VM, durableClientId, 3);
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
index b37facfb60..d08000b111 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
@@ -14,27 +14,25 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import static java.lang.Boolean.FALSE;
-import static java.lang.Boolean.TRUE;
 import static java.lang.Thread.sleep;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.geode.cache.InterestResultPolicy.NONE;
 import static org.apache.geode.cache.Region.SEPARATOR;
-import static org.apache.geode.cache.client.PoolManager.createFactory;
-import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
 import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.TYPE_CREATE;
 import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClient;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClientFromXmlN;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClients;
 import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServer;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServerFromXmlN;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createClientCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getClientCache;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
 import static org.apache.geode.test.dunit.Wait.pause;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -44,11 +42,9 @@ import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolManager;
-import org.apache.geode.cache.client.ServerRefusedConnectionException;
 import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.PoolFactoryImpl;
 import org.apache.geode.internal.cache.ha.HARegionQueue;
 import org.apache.geode.internal.cache.ha.HARegionQueueStats;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
@@ -65,14 +61,14 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
   public void testSimpleDurableClientUpdate() {
     // Start a server
     server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client that is not kept alive on the server when it stops
     // normally
     final String durableClientId = getName() + "_client";
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    durableClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, true), regionName,
-        getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
+        getClientDistributedSystemProperties(durableClientId), true));
 
     // Send clientReady message
     sendClientReady(durableClientVM);
@@ -80,13 +76,13 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
 
     // Start normal publisher client
-    publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    publisherClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, false),
         regionName));
 
     // Publish some entries
     final int numberOfEntries = 10;
-    publishEntries(0, 10);
+    publishEntries(publisherClientVM, 0, 10);
 
     // Verify the durable client received the updates
     checkListenerEvents(numberOfEntries, 1, -1, durableClientVM);
@@ -107,29 +103,28 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
   @Test
   public void testMultipleBridgeClientsInSingleDurableVM() {
     // Start a server
-    server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+    server1Port = server1VM.invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client with 2 regions (and 2 BridgeClients) that is not
     // kept alive on the server when it stops normally
     final String durableClientId = getName() + "_client";
     final String regionName1 = regionName + "1";
     final String regionName2 = regionName + "2";
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClients(
+    durableClientVM.invoke(() -> createCacheClients(
         getClientPool(getServerHostName(), server1Port, true), regionName1,
         regionName2, getClientDistributedSystemProperties(durableClientId)));
 
     // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        assertEquals(2, PoolManager.getAll().size());
-        CacheServerTestUtil.getClientCache().readyForEvents();
+        assertThat(PoolManager.getAll()).hasSize(2);
+        getClientCache().readyForEvents();
       }
     });
 
     // Verify durable clients on server
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Get the CacheClientNotifier
@@ -139,16 +134,16 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
         checkNumberOfClientProxies(2);
         String firstProxyRegionName = null;
         for (CacheClientProxy proxy : notifier.getClientProxies()) {
-          assertTrue(proxy.isDurable());
-          assertEquals(durableClientId, proxy.getDurableId());
-          assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT,
-              proxy.getDurableTimeout());
+          assertThat(proxy.isDurable()).isTrue();
+          assertThat(proxy.getDurableId()).isEqualTo(durableClientId);
+          assertThat(proxy.getDurableTimeout())
+              .isEqualTo(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
 
           // Verify the two HA region names aren't the same
           if (firstProxyRegionName == null) {
             firstProxyRegionName = proxy.getHARegionName();
           } else {
-            assertTrue(!firstProxyRegionName.equals(proxy.getHARegionName()));
+            assertThat(proxy.getHARegionName()).isNotEqualTo(firstProxyRegionName);
           }
         }
       }
@@ -158,7 +153,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
 
     // Verify the durable client is no longer on the server
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Find the proxy
@@ -170,75 +165,6 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
   }
 
-  /**
-   * Test that a second VM with the same durable id cannot connect to the server while the first VM
-   * is connected. Also, verify that the first client is not affected by the second one attempting
-   * to connect.
-   */
-  @Ignore("TODO: test is disabled")
-  @Test
-  public void testMultipleVMsWithSameDurableId() {
-    // Start a server
-    server1Port = server1VM
-        .invoke(() -> createCacheServer(regionName, TRUE));
-
-    // Start a durable client that is not kept alive on the server when it
-    // stops normally
-    final String durableClientId = getName() + "_client";
-    durableClientVM.invoke(() -> createCacheClient(
-        getClientPool(getServerHostName(), server1Port, true), regionName,
-        getClientDistributedSystemProperties(durableClientId), TRUE));
-
-    // Send clientReady message
-    sendClientReady(durableClientVM);
-
-    registerInterest(durableClientVM, regionName, true, NONE);
-
-    // Attempt to start another durable client VM with the same id.
-    publisherClientVM.invoke(new CacheSerializableRunnable("Create another durable client") {
-      @Override
-      public void run2() throws CacheException {
-        getSystem(getClientDistributedSystemProperties(durableClientId));
-        PoolFactoryImpl pf = (PoolFactoryImpl) createFactory();
-        pf.init(getClientPool(getServerHostName(), server1Port, true));
-        try {
-          pf.create("uncreatablePool");
-          fail("Should not have been able to create the pool");
-        } catch (ServerRefusedConnectionException expected) {
-          // expected exception
-          disconnectFromDS();
-        } catch (Exception e) {
-          fail("Should not have gotten here", e);
-        }
-      }
-    });
-
-    // Verify durable client on server
-    verifyDurableClientPresent(DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
-        server1VM);
-
-    // Start normal publisher client
-    publisherClientVM.invoke(() -> createCacheClient(
-        getClientPool(getServerHostName(), server1Port, false),
-        regionName));
-
-    // Publish some entries
-    final int numberOfEntries = 10;
-    publishEntries(0, numberOfEntries);
-
-    // Verify the durable client received the updates
-    checkListenerEvents(numberOfEntries, 1, -1, durableClientVM);
-
-    // Stop the publisher client
-    publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
-
-    // Stop the durable client
-    durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
-
-    // Stop the server
-    server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
-  }
-
   /**
    * Test that the server correctly processes starting two durable clients.
    */
@@ -246,12 +172,12 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
   public void testSimpleTwoDurableClients() {
     // Start a server
     server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client that is not kept alive on the server when it
     // stops normally
     final String durableClientId = getName() + "_client";
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    durableClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, true), regionName,
         getClientDistributedSystemProperties(durableClientId)));
 
@@ -262,7 +188,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     // it stops normally. Use the 'publisherClientVM' as a durable client.
     VM durableClient2VM = publisherClientVM;
     final String durableClientId2 = getName() + "_client2";
-    durableClient2VM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    durableClient2VM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, true), regionName,
         getClientDistributedSystemProperties(durableClientId2)));
 
@@ -270,7 +196,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     sendClientReady(durableClient2VM);
 
     // Verify durable clients on server
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Get the CacheClientNotifier
@@ -280,18 +206,18 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
         checkNumberOfClientProxies(2);
         boolean durableClient1Found = false, durableClient2Found = false;
         for (CacheClientProxy proxy : notifier.getClientProxies()) {
-          assertTrue(proxy.isDurable());
+          assertThat(proxy.isDurable()).isTrue();
           if (proxy.getDurableId().equals(durableClientId)) {
             durableClient1Found = true;
           }
           if (proxy.getDurableId().equals(durableClientId2)) {
             durableClient2Found = true;
           }
-          assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT,
-              proxy.getDurableTimeout());
+          assertThat(proxy.getDurableTimeout())
+              .isEqualTo(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
         }
-        assertTrue(durableClient1Found);
-        assertTrue(durableClient2Found);
+        assertThat(durableClient1Found).isTrue();
+        assertThat(durableClient2Found).isTrue();
       }
     });
 
@@ -303,94 +229,6 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
   }
 
-  /**
-   * Test that starting a durable client on multiple servers (one live and one not live) is
-   * processed correctly.
-   */
-  @Ignore("TODO: test is disabled for bug 52043")
-  @Test
-  public void testDurableClientMultipleServersOneLive() throws InterruptedException {
-    // Start server 1
-    server1Port = server1VM
-        .invoke(() -> createCacheServer(regionName, TRUE));
-
-    // Start server 2
-    final int server2Port = server2VM
-        .invoke(() -> createCacheServer(regionName, TRUE));
-
-    // Stop server 2
-    server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
-
-    // Start a durable client that is kept alive on the server when it stops
-    // normally
-    final String durableClientId = getName() + "_client";
-    final int durableClientTimeout = 60; // keep the client alive for 60 seconds
-    // final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
-    durableClientVM.invoke(() -> createCacheClient(
-        getClientPool(getServerHostName(), server1Port, server2Port, true),
-        regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-        TRUE));
-
-    // Send clientReady message
-    sendClientReady(durableClientVM);
-
-    registerInterest(durableClientVM, regionName, true, NONE);
-
-    // Verify durable client on server1
-    verifyDurableClientPresent(durableClientTimeout, durableClientId, server1VM);
-    // Start normal publisher client
-    publisherClientVM.invoke(() -> createCacheClient(getClientPool(getServerHostName(),
-        server1Port, server2Port, false), regionName));
-
-    // Publish some entries
-    final int numberOfEntries = 10;
-    publishEntries(0, numberOfEntries);
-
-    // Verify the durable client received the updates
-    checkListenerEvents(numberOfEntries, 1, -1, durableClientVM);
-
-    sleep(10000);
-
-    // Stop the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(TRUE));
-
-    // Verify the durable client still exists on the server
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
-      @Override
-      public void run2() throws CacheException {
-        // Find the proxy
-        CacheClientProxy proxy = getClientProxy();
-        assertNotNull(proxy);
-      }
-    });
-
-    // Publish some more entries
-    publishEntries(10, numberOfEntries);
-
-    // Re-start the durable client
-    durableClientVM.invoke(() -> createCacheClient(
-        getClientPool(getServerHostName(), server1Port, server2Port, true),
-        regionName, getClientDistributedSystemProperties(durableClientId), TRUE));
-
-    // Send clientReady message
-    sendClientReady(durableClientVM);
-
-    // Verify durable client on server
-    verifyDurableClientPresent(durableClientTimeout, durableClientId, server1VM);
-
-    // Verify the durable client received the updates held for it on the server
-    checkListenerEvents(numberOfEntries, 1, -1, durableClientVM);
-
-    // Stop the durable client
-    durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
-
-    // Stop the publisher client
-    publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
-
-    // Stop server 1
-    server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
-  }
-
   /**
    * Test that updates to two durable clients are processed correctly.
    */
@@ -398,7 +236,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
   public void testTwoDurableClientsStartStopUpdate() throws InterruptedException {
     // Start a server
     server1Port = server1VM
-        .invoke(() -> createCacheServer(regionName, TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client that is kept alive on the server when it stops
     // normally
@@ -408,7 +246,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     durableClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, true), regionName,
         getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-        TRUE));
+        true));
 
     // Send clientReady message
     sendClientReady(durableClientVM);
@@ -422,7 +260,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     durableClient2VM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, true), regionName,
         getClientDistributedSystemProperties(durableClientId2, durableClientTimeout),
-        TRUE));
+        true));
 
     // Send clientReady message
     sendClientReady(durableClient2VM);
@@ -430,7 +268,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     registerInterest(durableClient2VM, regionName, true, NONE);
 
     // Verify durable clients on server
-    verifyMultupleDurableClients(durableClientId, durableClientTimeout, durableClientId2);
+    verifyMultupleDurableClients(durableClientId, durableClientId2);
 
     // Start normal publisher client
     publisherClientVM.invoke(() -> createCacheClient(
@@ -439,7 +277,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
 
     // Publish some entries
     final int numberOfEntries = 10;
-    publishEntries(0, numberOfEntries);
+    publishEntries(publisherClientVM, 0, numberOfEntries);
 
     // Verify durable client 1 received the updates
     checkListenerEvents(numberOfEntries, 1, -1, durableClientVM);
@@ -451,19 +289,19 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     sleep(1000);
 
     // Stop the durable clients
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(TRUE));
-    durableClient2VM.invoke(() -> CacheServerTestUtil.closeCache(TRUE));
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(true));
+    durableClient2VM.invoke(() -> CacheServerTestUtil.closeCache(true));
 
     // Verify the durable clients still exist on the server
-    verifyMultupleDurableClients(durableClientId, durableClientTimeout, durableClientId2);
+    verifyMultupleDurableClients(durableClientId, durableClientId2);
 
     // Publish some more entries
-    publishEntries(10, numberOfEntries);
+    publishEntries(publisherClientVM, 10, numberOfEntries);
 
     sleep(1000);
 
     // Verify the durable clients' queues contain the entries
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Get the CacheClientNotifier
@@ -472,7 +310,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
         // Iterate the CacheClientProxies and verify the queue sizes
         checkNumberOfClientProxies(2);
         for (CacheClientProxy proxy : notifier.getClientProxies()) {
-          assertEquals(numberOfEntries, proxy.getQueueSize());
+          assertThat(proxy.getQueueSize()).isEqualTo(numberOfEntries);
         }
       }
     });
@@ -480,7 +318,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     // Re-start durable client 1
     durableClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, true), regionName,
-        getClientDistributedSystemProperties(durableClientId), TRUE));
+        getClientDistributedSystemProperties(durableClientId), true));
 
     // Send clientReady message
     sendClientReady(durableClientVM);
@@ -488,7 +326,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     // Re-start durable client 2
     durableClient2VM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, true), regionName,
-        getClientDistributedSystemProperties(durableClientId2), TRUE));
+        getClientDistributedSystemProperties(durableClientId2), true));
 
     // Send clientReady message
     sendClientReady(durableClient2VM);
@@ -512,9 +350,9 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
   }
 
-  private void verifyMultupleDurableClients(String durableClientId, int durableClientTimeout,
+  private void verifyMultupleDurableClients(String durableClientId,
       String durableClientId2) {
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Get the CacheClientNotifier
@@ -524,17 +362,17 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
         checkNumberOfClientProxies(2);
         boolean durableClient1Found = false, durableClient2Found = false;
         for (CacheClientProxy proxy : notifier.getClientProxies()) {
-          assertTrue(proxy.isDurable());
+          assertThat(proxy.isDurable()).isTrue();
           if (proxy.getDurableId().equals(durableClientId)) {
             durableClient1Found = true;
           }
           if (proxy.getDurableId().equals(durableClientId2)) {
             durableClient2Found = true;
           }
-          assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+          assertThat(60).isEqualTo(proxy.getDurableTimeout());
         }
-        assertTrue(durableClient1Found);
-        assertTrue(durableClient2Found);
+        assertThat(durableClient1Found).isTrue();
+        assertThat(durableClient2Found).isTrue();
       }
     });
   }
@@ -547,15 +385,15 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
   public void testDurableClientReconnectTwoServers() throws InterruptedException {
     // Start server 1
     server1Port = server1VM.invoke(
-        () -> createCacheServer(regionName, TRUE));
+        () -> createCacheServer(regionName, true));
 
     // on test flag for periodic ack
     server1VM
-        .invoke(() -> setTestFlagToVerifyActForMarker(TRUE));
+        .invoke(() -> setTestFlagToVerifyActForMarker(true));
 
     // Start server 2 using the same mcast port as server 1
     final int server2Port = server2VM
-        .invoke(() -> createCacheServer(regionName, TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Stop server 2
     server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
@@ -568,7 +406,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     durableClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, server2Port, true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-        TRUE));
+        true));
 
     // Send clientReady message
     sendClientReady(durableClientVM);
@@ -576,18 +414,18 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     registerInterest(durableClientVM, regionName, true, NONE);
 
     // Verify durable client on server 1
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Find the proxy
         checkNumberOfClientProxies(1);
         CacheClientProxy proxy = getClientProxy();
-        assertNotNull(proxy);
+        assertThat(proxy).isNotNull();
 
         // Verify that it is durable and its properties are correct
-        assertTrue(proxy.isDurable());
-        assertEquals(durableClientId, proxy.getDurableId());
-        assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+        assertThat(proxy.isDurable()).isTrue();
+        assertThat(durableClientId).isEqualTo(proxy.getDurableId());
+        assertThat(durableClientTimeout).isEqualTo(proxy.getDurableTimeout());
         verifyReceivedMarkerAck();
       }
     });
@@ -596,21 +434,21 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     pause(5000);
 
     // Stop the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(TRUE));
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(true));
 
     // Verify durable client on server 1
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Find the proxy
         checkNumberOfClientProxies(1);
         CacheClientProxy proxy = getClientProxy();
-        assertNotNull(proxy);
+        assertThat(proxy).isNotNull();
       }
     });
 
     // Re-start server2
-    server2VM.invoke(() -> createCacheServer(regionName, TRUE,
+    server2VM.invoke(() -> createCacheServer(regionName, true,
         server2Port));
 
     // Start normal publisher client
@@ -619,20 +457,20 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
 
     // Publish some entries
     final int numberOfEntries = 10;
-    publishEntries(0, numberOfEntries);
+    publishEntries(publisherClientVM, 0, numberOfEntries);
 
     sleep(1000);
 
     // Verify the durable client's queue contains the entries
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Find the proxy
         CacheClientProxy proxy = getClientProxy();
-        assertNotNull(proxy);
+        assertThat(proxy).isNotNull();
 
         // Verify the queue size
-        assertEquals(numberOfEntries, proxy.getQueueSize());
+        assertThat(numberOfEntries).isEqualTo(proxy.getQueueSize());
       }
     });
 
@@ -641,7 +479,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     durableClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, server2Port, true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-        TRUE));
+        true));
 
     // Send clientReady message
     sendClientReady(durableClientVM);
@@ -657,7 +495,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
         server1VM.invoke(DurableClientTestBase::getHARegionQueueName);
     String server2HARegionQueueName =
         server2VM.invoke(DurableClientTestBase::getHARegionQueueName);
-    assertEquals(server1HARegionQueueName, server2HARegionQueueName);
+    assertThat(server1HARegionQueueName).isEqualTo(server2HARegionQueueName);
 
     // Verify the durable client received the updates
     checkListenerEvents(numberOfEntries, 1, -1, durableClientVM);
@@ -669,8 +507,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
 
     // off test flag for periodic ack
-    server1VM
-        .invoke(() -> setTestFlagToVerifyActForMarker(FALSE));
+    server1VM.invoke(() -> setTestFlagToVerifyActForMarker(false));
 
     // Stop server 1
     server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
@@ -683,22 +520,22 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
   public void testReadyForEventsNotCalledImplicitly() {
     // Start a server
     server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client that is not kept alive on the server when it
     // stops normally
     final String durableClientId = getName() + "_client";
     // make the client use ClientCacheFactory so it will have a default pool
-    durableClientVM.invoke(() -> CacheServerTestUtil.createClientCache(
+    durableClientVM.invoke(() -> createClientCache(
         getClientPool(getServerHostName(), server1Port, true), regionName,
         getClientDistributedSystemProperties(durableClientId)));
 
     // verify that readyForEvents has not yet been called on the client's default pool
-    durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") {
+    durableClientVM.invoke("check readyForEvents not called", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         for (Pool p : PoolManager.getAll().values()) {
-          assertFalse(((PoolImpl) p).getReadyForEventsCalled());
+          assertThat(((PoolImpl) p).getReadyForEventsCalled()).isFalse();
         }
       }
     });
@@ -707,7 +544,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     sendClientReady(durableClientVM);
 
     // Verify durable clients on server
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Get the CacheClientNotifier
@@ -717,14 +554,14 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
         checkNumberOfClientProxies(1);
         boolean durableClient1Found = false;
         for (CacheClientProxy proxy : notifier.getClientProxies()) {
-          assertTrue(proxy.isDurable());
+          assertThat(proxy.isDurable()).isTrue();
           if (proxy.getDurableId().equals(durableClientId)) {
             durableClient1Found = true;
           }
-          assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT,
-              proxy.getDurableTimeout());
+          assertThat(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT)
+              .isEqualTo(proxy.getDurableTimeout());
         }
-        assertTrue(durableClient1Found);
+        assertThat(durableClient1Found).isTrue();
       }
     });
 
@@ -742,7 +579,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     regionName = "testReadyForEventsNotCalledImplicitlyWithCacheXML_region";
     // Start a server
     server1Port =
-        server1VM.invoke(() -> CacheServerTestUtil.createCacheServerFromXmlN(
+        server1VM.invoke(() -> createCacheServerFromXmlN(
             DurableClientTestBase.class.getResource("durablecq-server-cache.xml")));
 
     // Start a durable client that is not kept alive on the server when it
@@ -750,16 +587,16 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     final String durableClientId = getName() + "_client";
 
     // create client cache from xml
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClientFromXmlN(
+    durableClientVM.invoke(() -> createCacheClientFromXmlN(
         DurableClientTestBase.class.getResource("durablecq-client-cache.xml"), "client",
-        durableClientId, DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, Boolean.TRUE));
+        durableClientId, DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, true));
 
     // verify that readyForEvents has not yet been called on all the client's pools
-    durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") {
+    durableClientVM.invoke("check readyForEvents not called", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         for (Pool p : PoolManager.getAll().values()) {
-          assertFalse(((PoolImpl) p).getReadyForEventsCalled());
+          assertThat(((PoolImpl) p).getReadyForEventsCalled()).isFalse();
         }
       }
     });
@@ -775,31 +612,31 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
         server1VM);
 
     // Start normal publisher client
-    publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    publisherClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, false),
         regionName));
 
     // Publish some entries
     final int numberOfEntries = 10;
-    publishEntries(0, numberOfEntries);
+    publishEntries(publisherClientVM, 0, numberOfEntries);
 
     // Verify the durable client received the updates
     checkListenerEvents(numberOfEntries, 1, -1, durableClientVM);
 
     // Stop the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(Boolean.TRUE));
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(true));
 
     // Verify the durable client still exists on the server
     verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
         server1VM);
 
     // Publish some more entries
-    publisherClientVM.invoke(new CacheSerializableRunnable("Publish additional updates") {
+    publisherClientVM.invoke("Publish additional updates", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Get the region
-        Region<String, String> region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
+        Region<String, String> region = getCache().getRegion(regionName);
+        assertThat(region).isNotNull();
 
         // Publish some entries
         for (int i = 0; i < numberOfEntries; i++) {
@@ -812,9 +649,9 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
 
     // Re-start the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClientFromXmlN(
+    durableClientVM.invoke(() -> createCacheClientFromXmlN(
         DurableClientTestBase.class.getResource("durablecq-client-cache.xml"), "client",
-        durableClientId, DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, Boolean.TRUE));
+        durableClientId, DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, true));
 
     // Durable client registers durable cq on server'
     registerInterest(durableClientVM, regionName, true, InterestResultPolicy.KEYS_VALUES);
@@ -850,7 +687,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
 
     // Start a server
     server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client that is kept alive on the server when it stops
     // normally
@@ -874,7 +711,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     startClient(publisherClientVM, server1Port, regionName);
 
     // Publish some entries
-    publishEntries(regionName, 10);
+    publishEntries(publisherClientVM, regionName, 10);
 
     // verify cq stats are correct
     checkNumDurableCqs(server1VM, durableClientId, 3);
@@ -883,8 +720,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 5);
 
     // drop client proxy
-    server1VM.invoke(
-        new CacheSerializableRunnable("Close client proxy on server for client" + durableClientId) {
+    server1VM.invoke("Close client proxy on server for client" + durableClientId,
+        new CacheSerializableRunnable() {
           @Override
           public void run2() throws CacheException {
 
@@ -943,21 +780,21 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
   public void testSimpleDurableClientMultipleServers() {
     // Start server 1
     server1Port = server1VM.invoke(
-        () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        () -> createCacheServer(regionName, true));
 
     // Start server 2 using the same mcast port as server 1
     final int server2Port = server2VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client connected to both servers that is kept alive when
     // it stops normally
 
     durableClientId = getName() + "_client";
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    durableClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, server2Port, true),
         regionName,
         getClientDistributedSystemProperties(durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS),
-        Boolean.TRUE));
+        true));
 
     // Send clientReady message
     sendClientReady(durableClientVM);
@@ -969,7 +806,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
     verifyDurableClientPresence(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server2VM, 1);
 
     // Stop the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(Boolean.TRUE));
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache(true));
 
     // Verify the durable client is still on server 1
     verifyDurableClientPresence(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM, 1);
@@ -979,9 +816,9 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
 
     // Start up the client again. This time initialize it so that it is not kept
     // alive on the servers when it stops normally.
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    durableClientVM.invoke(() -> createCacheClient(
         getClientPool(getServerHostName(), server1Port, server2Port, true),
-        regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
+        regionName, getClientDistributedSystemProperties(durableClientId), true));
 
     // Send clientReady message
     sendClientReady(durableClientVM);
@@ -1014,26 +851,34 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
   public void testDurableClientReceivedClientSessionInitialValue() {
     // Start server 1
     server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start server 2
     int server2Port = server2VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, true));
 
     // Start normal publisher client
-    publisherClientVM.invoke(() -> CacheServerTestUtil
-        .createCacheClient(getClientPool(getServerHostName(),
-            server1Port, server2Port, false), regionName));
+    publisherClientVM.invoke(() -> createCacheClient(getClientPool(getServerHostName(),
+        server1Port, server2Port, false), regionName));
 
     // Create an entry
-    publishEntries(0, 1);
+    publishEntries(publisherClientVM, 0, 1);
 
     // Start a durable client with the ControlListener
     durableClientId = getName() + "_client";
-    startupDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS,
-        getClientPool(getServerHostName(), server1Port, server2Port,
-            true),
-        Boolean.TRUE);
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(getServerHostName(), server1Port, server2Port, true), regionName,
+        getClientDistributedSystemProperties(durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS),
+        true));
+
+    durableClientVM.invoke(() -> {
+      await().atMost(HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
+          .pollInterval(100, MILLISECONDS)
+          .untilAsserted(() -> assertThat(getCache()).isNotNull());
+    });
+
+    // Send clientReady message
+    sendClientReady(durableClientVM);
 
     // Use ClientSession on the server to ` in entry key on behalf of durable client
     boolean server1IsPrimary = false;
@@ -1054,15 +899,17 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
 
     // Wait for QRM to be processed on the secondary
     waitForEventsRemovedByQueueRemovalMessage(server1IsPrimary ? server2VM : server1VM,
-        durableClientId, 2);
+        durableClientId);
 
     // Stop durable client
     disconnectDurableClient(true);
 
     // restart durable client
     restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS,
-        getClientPool(getServerHostName(), server1Port, server2Port, true), Boolean.TRUE);
+        getClientPool(getServerHostName(), server1Port, server2Port, true), true);
 
+    // Send clientReady message
+    sendClientReady(durableClientVM);
     // Verify durable client does not receive create event
     checkListenerEvents(0, 1, TYPE_CREATE, durableClientVM);
 
@@ -1091,23 +938,21 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestBase {
 
 
   private void waitForEventsRemovedByQueueRemovalMessage(VM secondaryServerVM,
-      final String durableClientId,
-      final int numEvents) {
+      final String durableClientId) {
     secondaryServerVM.invoke(() -> DurableClientSimpleDUnitTest
-        .waitForEventsRemovedByQueueRemovalMessage(durableClientId, numEvents));
+        .waitForEventsRemovedByQueueRemovalMessage(durableClientId));
   }
 
-  private static void waitForEventsRemovedByQueueRemovalMessage(String durableClientId,
-      final int numEvents) {
+  private static void waitForEventsRemovedByQueueRemovalMessage(String durableClientId) {
     CacheClientNotifier ccn = CacheClientNotifier.getInstance();
     CacheClientProxy ccp = ccn.getClientProxy(durableClientId);
     HARegionQueue haRegionQueue = ccp.getHARegionQueue();
     HARegionQueueStats haRegionQueueStats = haRegionQueue.getStatistics();
     await()
         .untilAsserted(
-            () -> assertEquals(
-                "Expected queue removal messages: " + numEvents + " but actual messages: "
-                    + haRegionQueueStats.getEventsRemovedByQrm(),
-                numEvents, haRegionQueueStats.getEventsRemovedByQrm()));
+            () -> assertThat(haRegionQueueStats.getEventsRemovedByQrm()).describedAs(
+                "Expected queue removal messages: " + 2 + " but actual messages: "
+                    + haRegionQueueStats.getEventsRemovedByQrm())
+                .isEqualTo(2));
   }
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
index 7212e31bdb..bd742fe708 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java
@@ -21,12 +21,18 @@ import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIEN
 import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.ControlCqListener;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.ControlListener;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClient;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServer;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getClientCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getPool;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertNotNull;
 
 import java.time.Duration;
 import java.util.Iterator;
@@ -34,8 +40,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.Region;
@@ -59,18 +63,15 @@ import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PoolFactoryImpl;
 import org.apache.geode.internal.cache.ha.HARegionQueue;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 
 public class DurableClientTestBase extends JUnit4DistributedTestCase {
 
-  protected static final Logger logger = LogService.getLogger();
   private static final Duration VERY_LONG_DURABLE_CLIENT_TIMEOUT = Duration.ofMinutes(10);
   static final int VERY_LONG_DURABLE_TIMEOUT_SECONDS =
       (int) VERY_LONG_DURABLE_CLIENT_TIMEOUT.getSeconds();
@@ -86,7 +87,7 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   String durableClientId;
 
   @Override
-  public final void postSetUp() throws Exception {
+  public final void postSetUp() {
     server1VM = VM.getVM(0);
     server2VM = VM.getVM(1);
     durableClientVM = VM.getVM(2);
@@ -117,10 +118,22 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   void startupDurableClientAndServer(final int durableClientTimeout) {
 
     server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+        .invoke(() -> createCacheServer(regionName, Boolean.TRUE));
 
     durableClientId = getName() + "_client";
-    startupDurableClient(durableClientTimeout, Boolean.TRUE);
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(NetworkUtils.getServerHostName(), server1Port, true), regionName,
+        getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
+        Boolean.TRUE));
+
+    durableClientVM.invoke(() -> {
+      await().atMost(HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
+          .pollInterval(100, MILLISECONDS)
+          .until(CacheServerTestUtil::getCache, notNullValue());
+    });
+
+    // Send clientReady message
+    sendClientReady(durableClientVM);
     verifyDurableClientPresent(durableClientTimeout, durableClientId, server1VM);
 
   }
@@ -128,24 +141,26 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   // This exists so child classes can override the behavior and mock out network failures
   public void restartDurableClient(int durableClientTimeout, Pool clientPool,
       Boolean addControlListener) {
-    startupDurableClient(durableClientTimeout, clientPool, addControlListener);
+    durableClientVM.invoke(() -> createCacheClient(clientPool, regionName,
+        getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
+        addControlListener));
+
+    durableClientVM.invoke(() -> {
+      await().atMost(HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
+          .pollInterval(100, MILLISECONDS)
+          .until(CacheServerTestUtil::getCache, notNullValue());
+    });
   }
 
   // This exists so child classes can override the behavior and mock out network failures
   public void restartDurableClient(int durableClientTimeout, Boolean addControlListener) {
-    startupDurableClient(durableClientTimeout, addControlListener);
-  }
-
-
-  void startupDurableClient(int durableClientTimeout, Pool clientPool,
-      Boolean addControlListener) {
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        clientPool,
-        regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(NetworkUtils.getServerHostName(), server1Port, true), regionName,
+        getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
         addControlListener));
 
     durableClientVM.invoke(() -> {
-      await().atMost(1 * HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
+      await().atMost(HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
           .pollInterval(100, MILLISECONDS)
           .until(CacheServerTestUtil::getCache, notNullValue());
     });
@@ -154,46 +169,24 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
     sendClientReady(durableClientVM);
   }
 
-  private void startupDurableClient(int durableClientTimeout, Boolean addControlListener) {
-    startupDurableClient(durableClientTimeout,
-        getClientPool(NetworkUtils.getServerHostName(), server1Port, true), addControlListener);
-  }
-
-  void verifySimpleDurableClient() {
-    verifyDurableClientNotPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT,
-        durableClientId, durableClientVM);
-  }
-
   void verifyDurableClientPresent(int durableClientTimeout, String durableClientId,
       final VM serverVM) {
     verifyDurableClientPresence(durableClientTimeout, durableClientId, serverVM, 1);
   }
 
-  void verifyDurableClientNotPresent(int durableClientTimeout, String durableClientId,
+  void verifyDurableClientNotPresent(String durableClientId,
       final VM serverVM) {
-    verifyDurableClientPresence(durableClientTimeout, durableClientId, serverVM, 0);
+    verifyDurableClientPresence(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
+        serverVM, 0);
   }
 
-  void waitForDurableClientPresence(String durableClientId, VM serverVM, final int count) {
+  void waitForDurableClientPresence(String durableClientId, VM serverVM) {
     serverVM.invoke(() -> {
-      if (count > 0) {
-
-        WaitCriterion ev = new WaitCriterion() {
-          @Override
-          public boolean done() {
-            checkNumberOfClientProxies(count);
-            CacheClientProxy proxy = getClientProxy();
-
-            return proxy != null && durableClientId.equals(proxy.getDurableId());
-          }
-
-          @Override
-          public String description() {
-            return null;
-          }
-        };
-        GeodeAwaitility.await().untilAsserted(ev);
-      }
+      GeodeAwaitility.await().until(() -> {
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        return proxy != null && durableClientId.equals(proxy.getDurableId());
+      });
     });
   }
 
@@ -204,9 +197,7 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
 
       if (count > 0) {
         CacheClientProxy proxy = getClientProxy();
-
         assertThat(proxy).isNotNull();
-        // checkProxyIsAlive(proxy);
 
         // Verify that it is durable and its properties are correct
         assertThat(proxy.isDurable()).isTrue();
@@ -216,33 +207,19 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
     });
   }
 
+  void waitUntilHARegionQueueSizeIsZero(VM serverVM) {
+    serverVM.invoke(() -> await().atMost(60, SECONDS)
+        .until(() -> getClientProxy().getHARegionQueue().size() == 0));
+  }
+
   public void closeDurableClient() {
     durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
   }
 
   public void disconnectDurableClient(boolean keepAlive) {
-    printClientProxyState("Before");
     durableClientVM.invoke("close durable client cache",
         () -> CacheServerTestUtil.closeCache(keepAlive));
-    await()
-        .until(CacheServerTestUtil::getCache, nullValue());
-    printClientProxyState("after");
-  }
-
-  private void printClientProxyState(String st) {
-    CacheSerializableRunnable s =
-        new CacheSerializableRunnable("Logging CCCP and ServerConnection state") {
-          @Override
-          public void run2() throws CacheException {
-            // TODO Auto-generated method stub
-            CacheServerTestUtil.getCache().getLogger()
-                .info(st + " CCP states: " + getAllClientProxyState());
-            CacheServerTestUtil.getCache().getLogger().info(st + " CHM states: "
-                + printMap(
-                    ClientHealthMonitor.getInstance().getConnectedClients(null)));
-          }
-        };
-    server1VM.invoke(s);
+    await().until(CacheServerTestUtil::getCache, nullValue());
   }
 
   private static String printMap(Map<String, Object[]> m) {
@@ -297,11 +274,11 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
     }
 
     registerInterest(client, regionName, false, InterestResultPolicy.NONE);
-    server.invoke(new CacheSerializableRunnable("flush entries") {
+    server.invoke("flush entries", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        Region<String, String> region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
+        Region<String, String> region = getCache().getRegion(regionName);
+        assertThat(region).isNotNull();
         region.put("LAST", "ENTRY");
       }
     });
@@ -312,7 +289,7 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   // The client will then be able to continue, and get rejected
   // Then we proceed to drain and release all locks
   // The client will then reconnect
-  public class RejectClientReconnectTestHook implements CacheClientProxy.TestHook {
+  public static class RejectClientReconnectTestHook implements CacheClientProxy.TestHook {
     final CountDownLatch reconnectLatch = new CountDownLatch(1);
     final CountDownLatch continueDrain = new CountDownLatch(1);
     volatile boolean clientWasRejected = false;
@@ -358,7 +335,8 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
    * wait to continue server will be unblocked, and rejected client will the be unlocked after
    * server is rejected and continue
    */
-  public class CqExceptionDueToActivatingClientTestHook implements CacheClientProxy.TestHook {
+  public static class CqExceptionDueToActivatingClientTestHook
+      implements CacheClientProxy.TestHook {
     final CountDownLatch unblockDrain = new CountDownLatch(1);
     final CountDownLatch unblockClient = new CountDownLatch(1);
     final CountDownLatch finish = new CountDownLatch(1);
@@ -399,9 +377,9 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
 
   CqQuery createCq(String cqName, String cqQuery, boolean durable)
       throws CqException, CqExistsException {
-    QueryService qs = CacheServerTestUtil.getCache().getQueryService();
+    QueryService qs = getCache().getQueryService();
     CqAttributesFactory cqf = new CqAttributesFactory();
-    CqListener[] cqListeners = {new CacheServerTestUtil.ControlCqListener()};
+    CqListener[] cqListeners = {new ControlCqListener()};
     cqf.initCqListeners(cqListeners);
     CqAttributes cqa = cqf.create();
     return qs.newCq(cqName, cqQuery, cqa, durable);
@@ -464,15 +442,7 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   }
 
   static void checkNumberOfClientProxies(final int expected) {
-    await()
-        .until(() -> {
-          return expected == getNumberOfClientProxies();
-        });
-  }
-
-  static void checkProxyIsAlive(final CacheClientProxy proxy) {
-    await()
-        .until(proxy::isAlive);
+    await().atMost(30, SECONDS).until(() -> expected == getNumberOfClientProxies());
   }
 
   private static int getNumberOfClientProxies() {
@@ -481,7 +451,7 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
 
   static CacheServerImpl getBridgeServer() {
     CacheServerImpl bridgeServer =
-        (CacheServerImpl) CacheServerTestUtil.getCache().getCacheServers().iterator().next();
+        (CacheServerImpl) getCache().getCacheServers().iterator().next();
     assertThat(bridgeServer).isNotNull();
     return bridgeServer;
   }
@@ -520,21 +490,21 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
 
   void sendClientReady(VM vm) {
     // Send clientReady message
-    vm.invoke(new CacheSerializableRunnable("Send clientReady") {
+    vm.invoke("Send clientReady", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        CacheServerTestUtil.getClientCache().readyForEvents();
+        getClientCache().readyForEvents();
       }
     });
   }
 
   protected void registerInterest(VM vm, final String regionName, final boolean durable,
       final InterestResultPolicy interestResultPolicy) {
-    vm.invoke(new CacheSerializableRunnable("Register interest on region : " + regionName) {
+    vm.invoke("Register interest on region : " + regionName, new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
 
-        Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
+        Region<Object, Object> region = getCache().getRegion(regionName);
         assertThat(region).isNotNull();
 
         // Register interest in all keys
@@ -552,7 +522,7 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   }
 
   void createCq(VM vm, final String cqName, final String cqQuery, final boolean durable) {
-    vm.invoke(new CacheSerializableRunnable("Register cq " + cqName) {
+    vm.invoke("Register cq " + cqName, new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
 
@@ -567,14 +537,13 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   }
 
   // Publishes strings
-  void publishEntries(int startingValue, final int count) {
-    publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") {
+  void publishEntries(VM publisherClientVM, int startingValue, final int count) {
+    publisherClientVM.invoke("Publish entries", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        Region<String, String> region = CacheServerTestUtil.getCache().getRegion(
+        Region<String, String> region = getCache().getRegion(
             regionName);
         assertThat(region).isNotNull();
-
         // Publish some entries
         for (int i = startingValue; i < startingValue + count; i++) {
           String keyAndValue = String.valueOf(i);
@@ -587,12 +556,12 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   }
 
   // Publishes portfolios
-  void publishEntries(final String regionName, final int numEntries) {
-    publisherClientVM.invoke(new CacheSerializableRunnable("publish " + numEntries + " entries") {
+  void publishEntries(VM publisherClientVM, final String regionName, final int numEntries) {
+    publisherClientVM.invoke("publish " + numEntries + " entries", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Get the region
-        Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
+        Region<Object, Object> region = getCache().getRegion(regionName);
         assertThat(region).isNotNull();
 
         // Publish some entries
@@ -612,25 +581,27 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
 
   void checkCqStatOnServer(VM server, final String durableClientId, final String cqName,
       final int expectedNumber) {
-    server.invoke(new CacheSerializableRunnable(
-        "Check ha queued cq stats for durable client " + durableClientId + " cq: " + cqName) {
-      @Override
-      public void run2() throws CacheException {
+    server.invoke(
+        "Check ha queued cq stats for durable client " + durableClientId + " cq: " + cqName,
+        new CacheSerializableRunnable() {
+          @Override
+          public void run2() throws CacheException {
 
-        final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
-        final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
-        ClientProxyMembershipID proxyId = clientProxy.getProxyID();
-        CqService cqService = ((InternalCache) CacheServerTestUtil.getCache()).getCqService();
-        cqService.start();
-        final CqQueryImpl cqQuery = (CqQueryImpl) cqService.getClientCqFromServer(proxyId, cqName);
+            final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
+            final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
+            ClientProxyMembershipID proxyId = clientProxy.getProxyID();
+            CqService cqService = ((InternalCache) getCache()).getCqService();
+            cqService.start();
+            final CqQueryImpl cqQuery =
+                (CqQueryImpl) cqService.getClientCqFromServer(proxyId, cqName);
 
-        // Wait until we get the expected number of events or until 10 seconds are up
-        await()
-            .until(() -> cqQuery.getVsdStats().getNumHAQueuedEvents() == expectedNumber);
+            // Wait until we get the expected number of events or until 10 seconds are up
+            await()
+                .until(() -> cqQuery.getVsdStats().getNumHAQueuedEvents() == expectedNumber);
 
-        assertThat(expectedNumber).isEqualTo(cqQuery.getVsdStats().getNumHAQueuedEvents());
-      }
-    });
+            assertThat(expectedNumber).isEqualTo(cqQuery.getVsdStats().getNumHAQueuedEvents());
+          }
+        });
   }
 
   /*
@@ -639,44 +610,40 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
    */
   void checkHAQueueSize(VM server, final String durableClientId, final int expectedNumber,
       final int remaining) {
-    server.invoke(new CacheSerializableRunnable(
-        "Check ha queued size for durable client " + durableClientId) {
-      @Override
-      public void run2() throws CacheException {
-
-        final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
-        final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
+    server.invoke("Check ha queued size for durable client " + durableClientId,
+        new CacheSerializableRunnable() {
+          @Override
+          public void run2() throws CacheException {
 
-        // Wait until we get the expected number of events or until 10 seconds are up
-        await()
-            .until(() -> clientProxy.getQueueSizeStat() == expectedNumber
-                || clientProxy.getQueueSizeStat() == remaining);
+            final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
+            final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
 
-        assertThat(clientProxy.getQueueSizeStat() == expectedNumber
-            || clientProxy.getQueueSizeStat() == remaining).isTrue();
-      }
-    });
+            // Wait until we get the expected number of events or until 10 seconds are up
+            await().untilAsserted(
+                () -> assertThat(clientProxy.getQueueSizeStat()).isIn(expectedNumber, remaining));
+          }
+        });
   }
 
   void checkNumDurableCqs(VM server, final String durableClientId,
       final int expectedNumber) {
-    server.invoke(new CacheSerializableRunnable(
-        "check number of durable cqs on server for durable client: " + durableClientId) {
-      @Override
-      public void run2() throws CacheException {
-        try {
-          final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
-          final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
-          ClientProxyMembershipID proxyId = clientProxy.getProxyID();
-          CqService cqService = ((InternalCache) CacheServerTestUtil.getCache()).getCqService();
-          cqService.start();
-          java.util.List<String> cqNames = cqService.getAllDurableClientCqs(proxyId);
-          assertThat(expectedNumber).isEqualTo(cqNames.size());
-        } catch (Exception e) {
-          throw new CacheException(e) {};
-        }
-      }
-    });
+    server.invoke("check number of durable cqs on server for durable client: " + durableClientId,
+        new CacheSerializableRunnable() {
+          @Override
+          public void run2() throws CacheException {
+            try {
+              final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
+              final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
+              ClientProxyMembershipID proxyId = clientProxy.getProxyID();
+              CqService cqService = ((InternalCache) getCache()).getCqService();
+              cqService.start();
+              java.util.List<String> cqNames = cqService.getAllDurableClientCqs(proxyId);
+              assertThat(expectedNumber).isEqualTo(cqNames.size());
+            } catch (Exception e) {
+              throw new CacheException(e) {};
+            }
+          }
+        });
   }
 
   /*
@@ -687,19 +654,16 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
    */
   void checkCqListenerEvents(VM vm, final String cqName, final int numEvents,
       final int secondsToWait) {
-    vm.invoke(() -> {
-      checkCqListenerEvents(cqName, numEvents, secondsToWait);
-    });
+    vm.invoke(() -> checkCqListenerEvents(cqName, numEvents, secondsToWait));
   }
 
-  void checkCqListenerEvents(final String cqName, final int numEvents,
-      final int secondsToWait) {
-    QueryService qs = CacheServerTestUtil.getCache().getQueryService();
+  void checkCqListenerEvents(final String cqName, final int numEvents, final int secondsToWait) {
+    QueryService qs = getCache().getQueryService();
     CqQuery cq = qs.getCq(cqName);
     // Get the listener and wait for the appropriate number of events
-    CacheServerTestUtil.ControlCqListener listener =
-        (CacheServerTestUtil.ControlCqListener) cq.getCqAttributes().getCqListener();
-    listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEvents);
+    ControlCqListener listener =
+        (ControlCqListener) cq.getCqAttributes().getCqListener();
+    listener.waitWhileNotEnoughEvents(SECONDS.toMillis(secondsToWait), numEvents);
     assertThat(numEvents).isEqualTo(listener.events.size());
   }
 
@@ -707,21 +671,20 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
       final VM vm) {
     vm.invoke(() -> {
       // Get the region
-      Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
+      Region<Object, Object> region = getCache().getRegion(regionName);
       assertThat(region).isNotNull();
 
       // Get the listener and wait for the appropriate number of events
-      CacheServerTestUtil.ControlListener controlListener =
-          (CacheServerTestUtil.ControlListener) region.getAttributes().getCacheListeners()[0];
-
-      controlListener.waitWhileNotEnoughEvents(sleepMinutes * 60 * 1000, numberOfEntries,
+      ControlListener controlListener =
+          (ControlListener) region.getAttributes().getCacheListeners()[0];
+      controlListener.waitWhileNotEnoughEvents(MINUTES.toMillis(sleepMinutes), numberOfEntries,
           controlListener.getEvents(eventType));
     });
   }
 
   void startDurableClient(VM vm, String durableClientId, int serverPort1,
       String regionName, int durableTimeoutInSeconds) {
-    vm.invoke(() -> CacheServerTestUtil.createCacheClient(
+    vm.invoke(() -> createCacheClient(
         getClientPool(NetworkUtils.getServerHostName(), serverPort1, true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableTimeoutInSeconds),
         Boolean.TRUE));
@@ -730,38 +693,35 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   void startDurableClient(VM vm, String durableClientId, int serverPort1,
       String regionName) {
     vm.invoke(() -> {
-      CacheServerTestUtil.createCacheClient(
+      createCacheClient(
           getClientPool(NetworkUtils.getServerHostName(), serverPort1, true),
           regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE);
-      assertThat(CacheServerTestUtil.getClientCache()).isNotNull();
+      assertThat(getClientCache()).isNotNull();
     });
   }
 
   void startDurableClient(VM vm, String durableClientId, int serverPort1, int serverPort2,
       String regionName) {
-    vm.invoke(() -> CacheServerTestUtil.createCacheClient(
+    vm.invoke(() -> createCacheClient(
         getClientPool(NetworkUtils.getServerHostName(), serverPort1, serverPort2, true),
         regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
   }
 
   void startClient(VM vm, int serverPort1, String regionName) {
     vm.invoke(() -> {
-      CacheServerTestUtil.createCacheClient(
+      createCacheClient(
           getClientPool(NetworkUtils.getServerHostName(), serverPort1, false),
           regionName);
-      assertThat(CacheServerTestUtil.getClientCache()).isNotNull();
+      assertThat(getClientCache()).isNotNull();
     });
   }
 
   void checkPrimaryUpdater(VM vm) {
-    vm.invoke(new CacheSerializableRunnable("Verify durable client") {
+    vm.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-
-        await()
-            .until(() -> CacheServerTestUtil.getPool().isPrimaryUpdaterAlive());
-
-        assertThat(CacheServerTestUtil.getPool().isPrimaryUpdaterAlive()).isTrue();
+        await().until(() -> getPool().isPrimaryUpdaterAlive());
+        assertThat(getPool().isPrimaryUpdaterAlive()).isTrue();
       }
     });
   }
@@ -769,6 +729,4 @@ public class DurableClientTestBase extends JUnit4DistributedTestCase {
   protected void closeCache(VM vm) {
     vm.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
   }
-
-
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
index 82f086f4c3..d928b22d75 100755
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
@@ -16,12 +16,18 @@ package org.apache.geode.internal.cache.tier.sockets;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClient;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServer;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServerReturnPorts;
 import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getClientCache;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.unsetJavaSystemProperties;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.hamcrest.Matchers.notNullValue;
 
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -71,26 +77,26 @@ public class DurableClientTestCase extends DurableClientTestBase {
    * not
    */
   @Test
-  public void testSpecialDurableProperty() throws InterruptedException {
+  public void testSpecialDurableProperty() {
     final Properties jp = new Properties();
     jp.setProperty(GeodeGlossary.GEMFIRE_PREFIX + "SPECIAL_DURABLE", "true");
 
     try {
 
       server1Port = server1VM
-          .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+          .invoke(() -> createCacheServer(regionName, true));
 
       durableClientId = getName() + "_client";
       final String dId = durableClientId + "_gem_" + "CacheServerTestUtil";
 
-      durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-          getClientPool(NetworkUtils.getServerHostName(), server1Port, Boolean.TRUE),
+      durableClientVM.invoke(() -> createCacheClient(
+          getClientPool(NetworkUtils.getServerHostName(), server1Port, true),
           regionName, getClientDistributedSystemProperties(durableClientId,
               DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT),
-          Boolean.TRUE, jp));
+          true, jp));
 
       durableClientVM.invoke(() -> {
-        await().atMost(1 * HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
+        await().atMost(HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
             .pollInterval(100, MILLISECONDS)
             .until(CacheServerTestUtil::getCache, notNullValue());
       });
@@ -122,7 +128,7 @@ public class DurableClientTestCase extends DurableClientTestBase {
       disconnectDurableClient(false);
 
       // Verify the durable client is present on the server for closeCache=false case.
-      verifyDurableClientNotPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT,
+      verifyDurableClientNotPresent(
           durableClientId, server1VM);
 
       // Stop the server
@@ -131,7 +137,7 @@ public class DurableClientTestCase extends DurableClientTestBase {
       closeDurableClient();
     } finally {
 
-      durableClientVM.invoke(() -> CacheServerTestUtil.unsetJavaSystemProperties(jp));
+      durableClientVM.invoke(() -> unsetJavaSystemProperties(jp));
     }
 
   }
@@ -151,7 +157,7 @@ public class DurableClientTestCase extends DurableClientTestBase {
     verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
 
     // Re-start the durable client
-    restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.TRUE);
+    restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, true);
 
     // Verify durable client on server
     verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
@@ -181,13 +187,12 @@ public class DurableClientTestCase extends DurableClientTestBase {
       assertThat(proxy).isNotNull();
       assertThat(proxy._socket).isNotNull();
 
-      await()
-          .untilAsserted(() -> assertThat(proxy._socket.isClosed()).isTrue());
+      await().untilAsserted(() -> assertThat(proxy._socket.isClosed()).isTrue());
     });
 
     // Re-start the durable client (this is necessary so the
     // netDown test will set the appropriate system properties.
-    restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.TRUE);
+    restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, true);
 
     // Stop the durable client
     durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
@@ -210,7 +215,7 @@ public class DurableClientTestCase extends DurableClientTestBase {
     disconnectDurableClient(true);
 
     // Verify it no longer exists on the server
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         // Find the proxy
@@ -220,7 +225,7 @@ public class DurableClientTestCase extends DurableClientTestBase {
       }
     });
 
-    restartDurableClient(durableClientTimeout, Boolean.TRUE);
+    restartDurableClient(durableClientTimeout, true);
 
     // Stop the durable client
     durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
@@ -239,13 +244,13 @@ public class DurableClientTestCase extends DurableClientTestBase {
     registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
 
     // Start normal publisher client
-    publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    publisherClientVM.invoke(() -> createCacheClient(
         getClientPool(NetworkUtils.getServerHostName(), server1Port,
             false),
         regionName));
 
     // Publish some entries
-    publishEntries(0, 1);
+    publishEntries(publisherClientVM, 0, 1);
 
     // Wait until queue count is 0 on server1VM
     waitUntilQueueContainsRequiredNumberOfEvents(server1VM, 0);
@@ -260,7 +265,7 @@ public class DurableClientTestCase extends DurableClientTestBase {
     server1VM.invoke(DurableClientTestBase::waitForCacheClientProxyPaused);
 
     // Publish some more entries
-    publishEntries(1, 1);
+    publishEntries(publisherClientVM, 1, 1);
 
     // Verify the durable client's queue contains the entries
     waitUntilQueueContainsRequiredNumberOfEvents(server1VM, 1);
@@ -269,7 +274,7 @@ public class DurableClientTestCase extends DurableClientTestBase {
     verifyListenerUpdatesDisconnected(1);
 
     // Re-start the durable client
-    restartDurableClient(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, Boolean.TRUE);
+    restartDurableClient(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, true);
 
     // Verify durable client on server
     verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
@@ -299,13 +304,13 @@ public class DurableClientTestCase extends DurableClientTestBase {
     registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
 
     // Start normal publisher client
-    publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    publisherClientVM.invoke(() -> createCacheClient(
         getClientPool(NetworkUtils.getServerHostName(), server1Port,
             false),
         regionName));
 
     // Publish some entries
-    publishEntries(0, 1);
+    publishEntries(publisherClientVM, 0, 1);
 
     // Verify the durable client received the updates
     checkListenerEvents(1, 1, -1, durableClientVM);
@@ -324,10 +329,10 @@ public class DurableClientTestCase extends DurableClientTestBase {
     server1VM.invoke(DurableClientTestBase::waitForCacheClientProxyPaused);
 
     // Publish some entries
-    publishEntries(1, 1);
+    publishEntries(publisherClientVM, 1, 1);
 
     // Verify the durable client's queue contains the entries
-    server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+    server1VM.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
         CacheClientProxy proxy = getClientProxy();
@@ -341,7 +346,7 @@ public class DurableClientTestCase extends DurableClientTestBase {
     verifyListenerUpdatesDisconnected(1);
 
     // Re-start the durable client
-    restartDurableClient(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, Boolean.TRUE);
+    restartDurableClient(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, true);
 
     // Verify durable client on server
     verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
@@ -368,26 +373,26 @@ public class DurableClientTestCase extends DurableClientTestBase {
     // Start a server
     // Start server 1
     Integer[] ports = server1VM.invoke(
-        () -> CacheServerTestUtil.createCacheServerReturnPorts(regionName, Boolean.TRUE));
+        () -> createCacheServerReturnPorts(regionName, true));
     final int serverPort = ports[0];
 
     // Start a durable client that is not kept alive on the server when it
     // stops normally
     final String durableClientId = getName() + "_client";
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    durableClientVM.invoke(() -> createCacheClient(
         getClientPool(NetworkUtils.getServerHostName(), serverPort, true),
-        regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
+        regionName, getClientDistributedSystemProperties(durableClientId), true));
+
+    registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
 
     // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        CacheServerTestUtil.getClientCache().readyForEvents();
+        getClientCache().readyForEvents();
       }
     });
 
-    registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
-
     // Verify durable client on server
     verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
         server1VM);
@@ -396,21 +401,18 @@ public class DurableClientTestCase extends DurableClientTestBase {
     server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
 
     // Re-start the server
-    server1VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
-        serverPort));
+    server1VM.invoke(() -> createCacheServer(regionName, true, serverPort));
 
     // Verify durable client on server
     verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
         server1VM);
 
     // Start a publisher
-    publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(), serverPort,
-            false),
-        regionName));
+    publisherClientVM.invoke(() -> createCacheClient(getClientPool(NetworkUtils.getServerHostName(),
+        serverPort, false), regionName));
 
     // Publish some entries
-    publishEntries(0, 10);
+    publishEntries(publisherClientVM, 0, 10);
 
     // Verify the durable client received the updates
     checkListenerEvents(10, 1, -1, durableClientVM);
@@ -429,6 +431,10 @@ public class DurableClientTestCase extends DurableClientTestBase {
   @Test
   public void testDurableNonHAFailover() {
     durableFailover(0);
+  }
+
+  @Test
+  public void testDurableNonHAFailoverAfterReconnect() {
     durableFailoverAfterReconnect(0);
   }
 
@@ -437,6 +443,12 @@ public class DurableClientTestCase extends DurableClientTestBase {
     // Clients see this when the servers disconnect
     IgnoredException.addIgnoredException("Could not find any server");
     durableFailover(1);
+  }
+
+  @Test
+  public void testDurableHAFailoverAfterReconnect() {
+    // Clients see this when the servers disconnect
+    IgnoredException.addIgnoredException("Could not find any server");
     durableFailoverAfterReconnect(1);
   }
 
@@ -446,14 +458,10 @@ public class DurableClientTestCase extends DurableClientTestBase {
    * Redundancy level is set to 1 for this test case.
    */
   private void durableFailover(int redundancyLevel) {
-
     // Start server 1
-    server1Port = server1VM.invoke(
-        () -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+    server1Port = server1VM.invoke(() -> createCacheServer(regionName, true));
 
-    // Start server 2 using the same mcast port as server 1
-    final int server2Port = server2VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
+    final int server2Port = server2VM.invoke(() -> createCacheServer(regionName, true));
 
     // Stop server 2
     server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
@@ -471,61 +479,74 @@ public class DurableClientTestCase extends DurableClientTestBase {
     }
 
     durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(clientPool, regionName,
+    durableClientVM.invoke(() -> createCacheClient(clientPool, regionName,
         getClientDistributedSystemProperties(durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS),
-        Boolean.TRUE));
+        true));
+
+    registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
 
     // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        CacheServerTestUtil.getClientCache().readyForEvents();
+        getCache().readyForEvents();
       }
     });
 
-    registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
+    // Verify the durable client is connected to server1
+    verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
 
-    // Re-start server2
-    server2VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
-        server2Port));
+    // Re-start server2 and wait for it to be up.
+    server2VM.invoke(() -> createCacheServer(regionName, true, server2Port));
 
+    if (redundancyLevel == 1) {
+      // Verify the durable client is connected to server2
+      verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server2VM);
+    }
     // Start normal publisher client
-    publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(), server1Port,
-            server2Port, false),
-        regionName));
+    publisherClientVM.invoke(() -> createCacheClient(getClientPool(
+        NetworkUtils.getServerHostName(), server1Port, server2Port,
+        false), regionName));
 
     // Publish some entries
-    publishEntries(0, 1);
+    publishEntries(publisherClientVM, 0, 1);
 
     // Verify the durable client received the updates
     checkListenerEvents(1, 1, -1, durableClientVM);
 
+    // Wait to until the HARegionQueue has emptied before disconnecting the Durable client
+    // to avoid having left over messages that processed, in particular, the key 0 message
+    waitUntilHARegionQueueSizeIsZero(server1VM);
+
     // Stop the durable client, which discards the known entry
     disconnectDurableClient(true);
 
-    // Publish updates during client downtime
-    publishEntries(1, 1);
+    // Put key 1 during client downtime
+    publishEntries(publisherClientVM, 1, 1);
 
     // Re-start the durable client that is kept alive on the server
-    restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, clientPool, Boolean.TRUE);
+    restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, clientPool, true);
 
+    // Re-register interest
     registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
 
-    publishEntries(2, 1);
+    // Send clientReady message
+    sendClientReady(durableClientVM);
 
-    // Verify the durable client received the updates before failover
+    // put key 2
+    publishEntries(publisherClientVM, 2, 1);
+
+    // Verify the durable client received the 2 total updates
     checkListenerEvents(2, 1, -1, durableClientVM);
 
-    durableClientVM.invoke("Get", () -> {
-      await().untilAsserted(() -> {
-        Region<Object, Object> region = getCache().getRegion(regionName);
-        assertThat(region).isNotNull();
+    // Verify that the 0 entry is not present, but that 2 is.
+    durableClientVM.invoke("Get", () -> await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+      Region<Object, Object> region = getCache().getRegion(regionName);
+      assertThat(region).isNotNull();
 
-        assertThat(region.getEntry("0")).isNull();
-        assertThat(region.getEntry("2")).isNotNull();
-      });
-    });
+      assertThat(region.getEntry("0")).isNull();
+      assertThat(region.getEntry("2")).isNotNull();
+    }));
 
     // Stop server 1
     server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
@@ -535,7 +556,8 @@ public class DurableClientTestCase extends DurableClientTestBase {
       server2VM.invoke(this::verifyClientHasConnected);
     }
 
-    publishEntries(3, 1);
+    // put key 3
+    publishEntries(publisherClientVM, 3, 1);
 
     // Verify the durable client received the updates after failover
     checkListenerEvents(3, 1, -1, durableClientVM);
@@ -550,13 +572,16 @@ public class DurableClientTestCase extends DurableClientTestBase {
     server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
   }
 
+  /**
+   * wait unit we have the required number of events in the queue
+   */
   private void waitUntilQueueContainsRequiredNumberOfEvents(final VM vm,
       final int requiredEntryCount) {
-    vm.invoke(new CacheSerializableRunnable("Verify durable client") {
+    vm.invoke("Verify durable client", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
 
-        await().until(() -> {
+        await().atMost(30, TimeUnit.SECONDS).until(() -> {
           CacheClientProxy proxy = getClientProxy();
           if (proxy == null) {
             return false;
@@ -571,11 +596,10 @@ public class DurableClientTestCase extends DurableClientTestBase {
 
   private void durableFailoverAfterReconnect(int redundancyLevel) {
     // Start server 1
-    server1Port = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, true));
+    server1Port = server1VM.invoke(() -> createCacheServer(regionName, true));
 
-    int server2Port = server2VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, true));
+    // start server 2
+    final int server2Port = server2VM.invoke(() -> createCacheServer(regionName, true));
 
     // Start a durable client
     durableClientId = getName() + "_client";
@@ -590,73 +614,90 @@ public class DurableClientTestCase extends DurableClientTestBase {
     }
 
     durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(clientPool, regionName,
+    // Create the durable client cache (type = cache)
+    durableClientVM.invoke(() -> createCacheClient(clientPool, regionName,
         getClientDistributedSystemProperties(durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS),
-        Boolean.TRUE));
+        true));
+
+    // Register interest in all entries in the region
+    registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
 
     // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() {
       @Override
       public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
+        getCache().readyForEvents();
       }
     });
 
-    registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
-
+    // Verify the durable client is connected to the servers.
+    verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
+    if (redundancyLevel == 1) {
+      verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server2VM);
+    }
     // Start normal publisher client
-    publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
+    publisherClientVM.invoke(() -> createCacheClient(
         getClientPool(NetworkUtils.getServerHostName(), server1Port,
             server2Port, false),
         regionName));
 
-    // Publish some entries
-    publishEntries(0, 1);
+    // put key 0
+    publishEntries(publisherClientVM, 0, 1);
 
-    // Verify the durable client received the updates
+    // Verify the durable client received key 0
     checkListenerEvents(1, 1, -1, durableClientVM);
 
-    verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
+    // Wait to until the HARegionQueue has emptied before disconnecting the Durable client
+    // to avoid having left over messages that processed, in particular, the key 0 message
+    waitUntilHARegionQueueSizeIsZero(server1VM);
+    if (redundancyLevel == 1) {
+      waitUntilHARegionQueueSizeIsZero(server2VM);
+    }
 
     // Stop the durable client
     disconnectDurableClient(true);
 
-    // Stop server 1 - publisher will put 10 entries during shutdown/primary identification
+    // Stop server 1 - publisher will put 1 entries during shutdown/primary identification
     server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
 
-    // Publish updates during client downtime
-    publishEntries(1, 1);
+    // Put key 1 during client downtime
+    publishEntries(publisherClientVM, 1, 1);
 
     // Re-start the durable client that is kept alive on the server
-    restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, clientPool, Boolean.TRUE);
+    restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, clientPool, true);
 
+    // Re-register interest in all entries in the region
     registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE);
 
-    publishEntries(2, 2);
+    // Send clientReady message
+    sendClientReady(durableClientVM);
+
+    // Put keys 2 and 3
+    publishEntries(publisherClientVM, 2, 2);
 
     // Verify the durable client received the updates before failover
     if (redundancyLevel == 1) {
-      checkListenerEvents(4, 1, -1, durableClientVM);
+      checkListenerEvents(3, 1, -1, durableClientVM);
     } else {
-      checkListenerEvents(2, 1, -1, durableClientVM);
+      checkListenerEvents(1, 1, -1, durableClientVM);
     }
 
-    durableClientVM.invoke("Get", () -> {
-      await().untilAsserted(() -> {
-        Region<Object, Object> region = getCache().getRegion(regionName);
-        assertThat(region).isNotNull();
-        // Register interest in all keys
-        assertThat(region.getEntry("0")).isNull();
-      });
-    });
+    // Verify that key 0 is not present
+    durableClientVM.invoke("Get", () -> await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+      Region<Object, Object> region = getCache().getRegion(regionName);
+      assertThat(region).isNotNull();
 
-    publishEntries(4, 1);
+      assertThat(region.getEntry("0")).isNull();
+    }));
+
+    // put key 4
+    publishEntries(publisherClientVM, 4, 1);
 
     // Verify the durable client received the updates after failover
     if (redundancyLevel == 1) {
-      checkListenerEvents(5, 1, -1, durableClientVM);
+      checkListenerEvents(4, 1, -1, durableClientVM);
     } else {
-      checkListenerEvents(3, 1, -1, durableClientVM);
+      checkListenerEvents(2, 1, -1, durableClientVM);
     }
 
     // Stop the durable client
@@ -669,8 +710,9 @@ public class DurableClientTestCase extends DurableClientTestBase {
     server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
   }
 
+
   private void verifyClientHasConnected() {
-    CacheServer cacheServer = CacheServerTestUtil.getCache().getCacheServers().get(0);
+    CacheServer cacheServer = getCache().getCacheServers().get(0);
     CacheClientNotifier ccn =
         ((InternalCacheServer) cacheServer).getAcceptor().getCacheClientNotifier();
     await().until(() -> ccn.getClientProxies().size() == 1);
diff --git a/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java b/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java
index 1c6801abe2..57cc3c42c4 100755
--- a/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java
+++ b/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java
@@ -22,8 +22,8 @@ import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
-import static org.apache.geode.test.dunit.Assert.assertNotNull;
-import static org.apache.geode.test.dunit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 import java.io.File;
 import java.net.InetSocketAddress;
@@ -94,8 +94,7 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
     createClientCache(poolAttr, regionName, getClientProperties());
   }
 
-  public static void createClientCache(Pool poolAttr, String regionName, Properties dsProperties)
-      throws Exception {
+  public static void createClientCache(Pool poolAttr, String regionName, Properties dsProperties) {
     ClientCacheFactory ccf = new ClientCacheFactory(dsProperties);
     if (poolAttr != null) {
       ccf.setPoolFreeConnectionTimeout(poolAttr.getFreeConnectionTimeout())
@@ -131,22 +130,10 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
     pool = (PoolImpl) cc.getDefaultPool();
   }
 
-  public static void createPool(PoolAttributes poolAttr) throws Exception {
-    Properties props = new Properties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(LOCATORS, "");
-
-    DistributedSystem ds = new CacheServerTestUtil().getSystem(props);
-
+  public static void createPool(PoolAttributes poolAttr) {
     PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
     pf.init(poolAttr);
-    PoolImpl p = (PoolImpl) pf.create("CacheServerTestUtil");
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.LOCAL);
-    factory.setPoolName(p.getName());
-
-    RegionAttributes attrs = factory.create();
-    pool = p;
+    pool = (PoolImpl) pf.create("CacheServerTestUtil");
   }
 
   public static void createCacheClient(Pool poolAttr, String regionName, Properties dsProperties,
@@ -155,12 +142,12 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
   }
 
   public static void createCacheClient(Pool poolAttr, String regionName, Properties dsProperties,
-      Boolean addControlListener, Properties javaSystemProperties) throws Exception {
+      Boolean addControlListener, Properties javaSystemProperties) {
     new CacheServerTestUtil().createCache(dsProperties);
     IgnoredException.addIgnoredException("java.net.ConnectException||java.net.SocketException");
 
     if (javaSystemProperties != null && javaSystemProperties.size() > 0) {
-      Enumeration e = javaSystemProperties.propertyNames();
+      Enumeration<?> e = javaSystemProperties.propertyNames();
 
       while (e.hasMoreElements()) {
         String key = (String) e.nextElement();
@@ -171,20 +158,20 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
     PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
     pf.init(poolAttr);
     PoolImpl p = (PoolImpl) pf.create("CacheServerTestUtil");
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setScope(Scope.LOCAL);
     factory.setPoolName(p.getName());
     if (addControlListener) {
       factory.addCacheListener(new ControlListener());
     }
-    RegionAttributes attrs = factory.create();
+    RegionAttributes<Object, Object> attrs = factory.create();
     cache.createRegion(regionName, attrs);
     pool = p;
   }
 
   public static void unsetJavaSystemProperties(Properties javaSystemProperties) {
     if (javaSystemProperties != null && javaSystemProperties.size() > 0) {
-      Enumeration e = javaSystemProperties.propertyNames();
+      Enumeration<?> e = javaSystemProperties.propertyNames();
 
       while (e.hasMoreElements()) {
         String key = (String) e.nextElement();
@@ -193,16 +180,15 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
     }
   }
 
-  public static void createCacheClient(Pool poolAttr, String regionName1, String regionName2)
-      throws Exception {
+  public static void createCacheClient(Pool poolAttr, String regionName1, String regionName2) {
     new CacheServerTestUtil().createCache(getClientProperties());
     PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
     pf.init(poolAttr);
     PoolImpl p = (PoolImpl) pf.create("CacheServerTestUtil");
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setScope(Scope.LOCAL);
     factory.setPoolName(p.getName());
-    RegionAttributes attrs = factory.create();
+    RegionAttributes<Object, Object> attrs = factory.create();
     cache.createRegion(regionName1, attrs);
     cache.createRegion(regionName2, attrs);
     pool = p;
@@ -284,21 +270,21 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
    * Create client regions
    */
   public static void createCacheClients(Pool poolAttr, String regionName1, String regionName2,
-      Properties dsProperties) throws Exception {
+      Properties dsProperties) {
     new CacheServerTestUtil().createCache(dsProperties);
 
     // Initialize region1
     PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
     pf.init(poolAttr);
     Pool p = pf.create("CacheServerTestUtil1");
-    AttributesFactory factory1 = new AttributesFactory();
+    AttributesFactory<Object, Object> factory1 = new AttributesFactory<>();
     factory1.setScope(Scope.LOCAL);
     factory1.setPoolName(p.getName());
     cache.createRegion(regionName1, factory1.create());
 
     // Initialize region2
     p = pf.create("CacheServerTestUtil2");
-    AttributesFactory factory2 = new AttributesFactory();
+    AttributesFactory<Object, Object> factory2 = new AttributesFactory<>();
     factory2.setScope(Scope.LOCAL);
     factory2.setPoolName(p.getName());
     cache.createRegion(regionName2, factory2.create());
@@ -338,11 +324,11 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(LOCATORS, "localhost[" + DistributedTestUtils.getDUnitLocatorPort() + "]");
     new CacheServerTestUtil().createCache(props);
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setScope(Scope.DISTRIBUTED_ACK);
     factory.setEnableBridgeConflation(true);
     factory.setDataPolicy(DataPolicy.REPLICATE);
-    RegionAttributes attrs = factory.create();
+    RegionAttributes<Object, Object> attrs = factory.create();
     cache.createRegion(regionName, attrs);
     CacheServer server = cache.addCacheServer();
     server.setPort(serverPort);
@@ -353,11 +339,11 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
   public static Integer createCacheServer(String regionName1, String regionName2,
       Boolean notifyBySubscription) throws Exception {
     new CacheServerTestUtil().createCache(new Properties());
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setScope(Scope.DISTRIBUTED_ACK);
     factory.setEnableBridgeConflation(true);
     factory.setDataPolicy(DataPolicy.REPLICATE);
-    RegionAttributes attrs = factory.create();
+    RegionAttributes<Object, Object> attrs = factory.create();
     if (!regionName1.equals("")) {
       cache.createRegion(regionName1, attrs);
     }
@@ -372,23 +358,23 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
     return server1.getPort();
   }
 
-  private void createCache(Properties props) throws Exception {
+  private void createCache(Properties props) {
     DistributedSystem ds = getSystem(props);
-    assertNotNull(ds);
+    assertThat(ds).isNotNull();
     ds.disconnect();
     ds = getSystem(props);
     cache = CacheFactory.create(ds);
-    assertNotNull(cache);
+    assertThat(cache).isNotNull();
   }
 
-  private void createClientCache(Properties props, ClientCacheFactory ccf) throws Exception {
+  private void createClientCache(Properties props, ClientCacheFactory ccf) {
     DistributedSystem ds = getSystem(props);
-    assertNotNull(ds);
+    assertThat(ds).isNotNull();
     ds.disconnect();
     ClientCache cc = ccf.create();
     setSystem(props, cc.getDistributedSystem());
     cache = (Cache) cc;
-    assertNotNull(cache);
+    assertThat(cache).isNotNull();
     expected = IgnoredException
         .addIgnoredException("java.net.ConnectionException||java.net.SocketException");
   }
@@ -491,10 +477,10 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
   }
 
   public static class ControlListener extends CacheListenerAdapter implements Declarable {
-    public final LinkedList<EventWrapper> events = new LinkedList();
-    public final LinkedList<EntryEvent> createEvents = new LinkedList();
-    public final LinkedList<EntryEvent> updateEvents = new LinkedList();
-    public final LinkedList<EntryEvent> destroyEvents = new LinkedList();
+    public final LinkedList<EventWrapper> events = new LinkedList<>();
+    public final LinkedList<EntryEvent> createEvents = new LinkedList<>();
+    public final LinkedList<EntryEvent> updateEvents = new LinkedList<>();
+    public final LinkedList<EntryEvent> destroyEvents = new LinkedList<>();
     public final Object CONTROL_LOCK = new Object();
 
     // added to test creation of cache from xml
@@ -509,6 +495,11 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
       return waitWhileNotEnoughEvents(sleepMs, eventCount, getEvents(eventType));
     }
 
+
+    /**
+     * This method is not implemented to test event count matches the eventsToCheck.size() which is
+     * confusing. It will wait and return if it got something in the eventsToCheck or not.
+     */
     public boolean waitWhileNotEnoughEvents(long sleepMs, int eventCount, List eventsToCheck) {
       long maxMillis = System.currentTimeMillis() + sleepMs;
       synchronized (CONTROL_LOCK) {
@@ -528,7 +519,7 @@ public class CacheServerTestUtil extends JUnit4DistributedTestCase {
     }
 
     public List getEvents(int eventType) {
-      List eventsToCheck = null;
+      List eventsToCheck;
       switch (eventType) {
         case TYPE_CREATE:
           eventsToCheck = createEvents;