You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2022/04/14 15:45:57 UTC

[geode] 03/05: GEODE-8228: Await on stats.

This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7514b7c53c6fee87896badec387158ba85b8a4c8
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Fri Apr 8 14:50:11 2022 -0700

    GEODE-8228: Await on stats.
---
 .../geode/internal/cache/wan/WANTestBase.java      |  81 ++-
 ...rallelGatewaySenderAlertThresholdDUnitTest.java |  41 +-
 .../ParallelGatewaySenderOperationsDUnitTest.java  |  22 +-
 .../parallel/ParallelWANConflationDUnitTest.java   | 101 ++-
 .../wan/parallel/ParallelWANStatsDUnitTest.java    | 745 +++++++++++----------
 .../serial/SerialGatewaySenderQueueDUnitTest.java  |  14 +-
 .../wan/serial/SerialWANConflationDUnitTest.java   |  54 +-
 .../cache/wan/serial/SerialWANStatsDUnitTest.java  |  42 +-
 8 files changed, 544 insertions(+), 556 deletions(-)

diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 62f4ca9749..16ea5f6b91 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1159,11 +1159,8 @@ public class WANTestBase extends DistributedTestCase {
   public static void checkQueueSizeInStats(String senderId, final int expectedQueueSize) {
     AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
     GatewaySenderStats statistics = sender.getStatistics();
-    await()
-        .untilAsserted(() -> assertThat(statistics.getEventQueueSize()).as(
-            "Expected queue size: " + expectedQueueSize
-                + " but actual size: " + statistics.getEventQueueSize())
-            .isEqualTo(expectedQueueSize));
+    await().untilAsserted(() -> assertThat(statistics.getEventQueueSize())
+        .isEqualTo(expectedQueueSize));
   }
 
   public static void checkConnectionStats(String senderId) {
@@ -1186,11 +1183,7 @@ public class WANTestBase extends DistributedTestCase {
             (ConcurrentParallelGatewaySenderQueue) regionQueue;
         parallelGatewaySenderQueue.getRegions();
       }
-      await()
-          .untilAsserted(() -> assertThat(regionQueue.size()).as(
-              "Expected queue entries: " + expectedQueueSize
-                  + " but actual entries: " + regionQueue.size())
-              .isEqualTo(expectedQueueSize));
+      await().untilAsserted(() -> assertThat(regionQueue.size()).isEqualTo(expectedQueueSize));
     }
     ArrayList<Integer> stats = new ArrayList<>();
     stats.add(statistics.getEventQueueSize());
@@ -1237,10 +1230,12 @@ public class WANTestBase extends DistributedTestCase {
   public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
       final int eventsQueued, final int eventsDistributed) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
-    assertThat(statistics.getEventQueueSize()).isEqualTo(queueSize);
-    assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
-    assertThat(statistics.getEventsQueued()).isEqualTo(eventsQueued);
-    assert (statistics.getEventsDistributed() >= eventsDistributed);
+    await().untilAsserted(() -> {
+      assertThat(statistics.getEventQueueSize()).isEqualTo(queueSize);
+      assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
+      assertThat(statistics.getEventsQueued()).isEqualTo(eventsQueued);
+      assertThat(statistics.getEventsDistributed()).isGreaterThanOrEqualTo(eventsDistributed);
+    });
   }
 
   public static void checkGatewayReceiverStats(int processBatches, int eventsReceived,
@@ -1322,19 +1317,23 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void checkEventFilteredStats(String senderId, final int eventsFiltered) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
-    assertThat(statistics.getEventsFiltered()).isEqualTo(eventsFiltered);
+    await()
+        .untilAsserted(() -> assertThat(statistics.getEventsFiltered()).isEqualTo(eventsFiltered));
   }
 
   public static void checkConflatedStats(String senderId, final int eventsConflated) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
-    assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated);
+    await().untilAsserted(
+        () -> assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated));
   }
 
   public static void checkStats_Failover(String senderId, final int eventsReceived) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
-    assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
-    assertThat((statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary()
-        + statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived);
+    await().untilAsserted(() -> {
+      assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
+      assertThat((statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary()
+          + statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived);
+    });
   }
 
   public static void checkBatchStats(String senderId, final int batches) {
@@ -1343,38 +1342,46 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void checkBatchStats(String senderId, final int batches, boolean isExact) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
-    if (isExact) {
-      assert (statistics.getBatchesDistributed() == batches);
-    } else {
-      assert (statistics.getBatchesDistributed() >= batches);
-    }
-    assertThat(statistics.getBatchesRedistributed()).isEqualTo(0);
+    await().untilAsserted(() -> {
+      if (isExact) {
+        assertThat(statistics.getBatchesDistributed()).isEqualTo(batches);
+      } else {
+        assertThat(statistics.getBatchesDistributed()).isGreaterThanOrEqualTo(batches);
+      }
+      assertThat(statistics.getBatchesRedistributed()).isEqualTo(0);
+    });
   }
 
   public static void checkBatchStats(String senderId, final int batches,
       boolean isExact, final boolean batchesRedistributed) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
-    if (isExact) {
-      assert (statistics.getBatchesDistributed() == batches);
-    } else {
-      assert (statistics.getBatchesDistributed() >= batches);
-    }
-    assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
+    await().untilAsserted(() -> {
+      if (isExact) {
+        assertThat(statistics.getBatchesDistributed()).isEqualTo(batches);
+      } else {
+        assertThat(statistics.getBatchesDistributed()).isGreaterThanOrEqualTo(batches);
+      }
+      assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
+    });
   }
 
   public static void checkBatchStats(String senderId, final boolean batchesDistributed,
       final boolean batchesRedistributed) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
-    assertThat((statistics.getBatchesDistributed() > 0)).isEqualTo(batchesDistributed);
-    assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
+    await().untilAsserted(() -> {
+      assertThat((statistics.getBatchesDistributed() > 0)).isEqualTo(batchesDistributed);
+      assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
+    });
   }
 
   public static void checkUnProcessedStats(String senderId, int events) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
-    assertThat((statistics.getUnprocessedEventsAddedBySecondary()
-        + statistics.getUnprocessedTokensRemovedBySecondary())).isEqualTo(events);
-    assertThat((statistics.getUnprocessedEventsRemovedByPrimary()
-        + statistics.getUnprocessedTokensAddedByPrimary())).isEqualTo(events);
+    await().untilAsserted(() -> {
+      assertThat((statistics.getUnprocessedEventsAddedBySecondary()
+          + statistics.getUnprocessedTokensRemovedBySecondary())).isEqualTo(events);
+      assertThat((statistics.getUnprocessedEventsRemovedByPrimary()
+          + statistics.getUnprocessedTokensAddedByPrimary())).isEqualTo(events);
+    });
   }
 
   public static GatewaySenderStats getGatewaySenderStats(String senderId) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java
index 2b0c332e4a..f7e2f77f85 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java
@@ -17,7 +17,7 @@ package org.apache.geode.internal.cache.wan.parallel;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.ArrayList;
+import java.util.List;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -86,17 +86,16 @@ public class ParallelGatewaySenderAlertThresholdDUnitTest extends WANTestBase {
     vm2.invoke(serializableRunnableIF);
     vm3.invoke(serializableRunnableIF);
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
 
-    assertThat((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)) > 0).as(
-        "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold > 0").isTrue();
+      assertThat((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)) > 0).as(
+          "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold > 0")
+          .isTrue();
+    });
 
     int v4alert = vm4.invoke(
         ParallelGatewaySenderAlertThresholdDUnitTest::checkSenderMBeanAlertThreshold);
@@ -161,18 +160,16 @@ public class ParallelGatewaySenderAlertThresholdDUnitTest extends WANTestBase {
     vm2.invoke(serializableRunnableIF);
     vm3.invoke(serializableRunnableIF);
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
 
-    assertThat(0).as(
-        "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold = 0")
-        .isEqualTo((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)));
+      assertThat(0).as(
+          "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold = 0")
+          .isEqualTo((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)));
+    });
 
     int v4alert = vm4.invoke(
         ParallelGatewaySenderAlertThresholdDUnitTest::checkSenderMBeanAlertThreshold);
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index a9fe2e04c3..78e2aa2360 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -479,14 +479,10 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
       startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
 
       async.join();
-      ArrayList<Integer> vm4List =
-          (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
-      ArrayList<Integer> vm5List =
-          (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
-      ArrayList<Integer> vm6List =
-          (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
-      ArrayList<Integer> vm7List =
-          (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
       if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) {
         foundEventsDroppedDueToPrimarySenderNotRunning = true;
       }
@@ -1258,12 +1254,10 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
             .isEqualTo(100);
 
     await().untilAsserted(() -> {
-      int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-      int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-      int vm6SecondarySize = vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-      int vm7SecondarySize = vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-      assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize + vm7SecondarySize)
-          .isEqualTo(0);
+      assertThat(vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+      assertThat(vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+      assertThat(vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+      assertThat(vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
     });
 
   }
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index e7dafb0e95..88fa8a43b3 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -17,8 +17,8 @@ package org.apache.geode.internal.cache.wan.parallel;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -120,26 +120,26 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
     vm6.invoke(() -> enableConflation("ln"));
     vm7.invoke(() -> enableConflation("ln"));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100));
-    assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100).as(
-        "Event in secondary queue should be 100").isTrue();
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+      assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100).as(
+          "Event in secondary queue should be 100").isTrue();
+    });
 
     resumeSenders();
 
-    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
 
-    assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as(
-        "No events conflated in batch").isTrue();
+      assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as(
+          "No events conflated in batch").isTrue();
+    });
 
     verifySecondaryEventQueuesDrained();
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
@@ -148,15 +148,10 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
 
   private void verifySecondaryEventQueuesDrained() {
     await().untilAsserted(() -> {
-      int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-      int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-      int vm6SecondarySize = vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-      int vm7SecondarySize = vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-
-      assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize + vm7SecondarySize).as(
-          "Event in secondary queue should be 0 after dispatched, but actual is " + vm4SecondarySize
-              + ":" + vm5SecondarySize + ":" + vm6SecondarySize + ":" + vm7SecondarySize)
-          .isEqualTo(0);
+      assertThat(vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+      assertThat(vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+      assertThat(vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+      assertThat(vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
     });
   }
 
@@ -217,38 +212,34 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
   }
 
   private void validateSecondaryEventQueueSize(int expectedNum, int redundancy) {
-    ArrayList<Integer> vm4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> vm5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> vm6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> vm7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    assertThat(
-        (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum
-            * redundancy).as(
-                "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is "
-                    + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)))
-                .isTrue();
+    await().untilAsserted(() -> {
+      List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+      List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+      List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+      List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+      assertThat(
+          (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum
+              * redundancy).as(
+                  "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is "
+                      + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)))
+                  .isTrue();
+    });
   }
 
   private void validateEventsProcessedByPQRM(int expectedNum, int redundancy) {
-    ArrayList<Integer> vm4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> vm5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> vm6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> vm7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    assertThat(
-        (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum
-            * redundancy).as(
-                "Event processed by queue removal message should be " + (expectedNum * redundancy)
-                    + ", but is "
-                    + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)))
-                .isTrue();
+    await().untilAsserted(() -> {
+      List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      assertThat(
+          (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum
+              * redundancy).as(
+                  "Event processed by queue removal message should be " + (expectedNum * redundancy)
+                      + ", but is "
+                      + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)))
+                  .isTrue();
+    });
   }
 
   @Test
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 79f7afb438..ce7eb03b8b 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -102,38 +102,41 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     createReceiverPR(vm2, 1);
     putKeyValues();
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS); // queue
-                                                                                                   // size
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(
-        NUM_PUTS * 2); // eventsReceived
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(
-        NUM_PUTS * 2); // events queued
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0); // events
-                                                                                            // distributed
-    assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
-        NUM_PUTS); // secondary queue size
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+      // queue size
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
+      // eventsReceived
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(
+          NUM_PUTS * 2);
+      // events queued
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(
+          NUM_PUTS * 2);
+      // events distributed
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
+      // secondary queue size
+      assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
+          NUM_PUTS);
+
+      System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10)
+          + ":" + v6List.get(10) + ":" + v7List.get(10));
+    });
 
     // stop vm7 to trigger rebalance and move some primary buckets
-    System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10)
-        + ":" + v6List.get(10) + ":" + v7List.get(10));
     vm7.invoke(WANTestBase::closeCache);
     await().untilAsserted(() -> {
       int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
       int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
       int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
       assertThat(v4secondarySize + v5secondarySize + v6secondarySize).isEqualTo(NUM_PUTS);
+      System.out
+          .println("New secondary queue sizes:" + v4secondarySize + ":" + v5secondarySize + ":"
+              + v6secondarySize);
     });
-    System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":"
-        + v6List.get(10));
 
     vm7.invoke(() -> WANTestBase.createCache(lnPort));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
@@ -141,16 +144,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     startSenderInVMs("ln", vm7);
     vm7.invoke(() -> pauseSender("ln"));
 
-    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
-        NUM_PUTS); // secondary
-    // queue
-    // size
-    System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":"
-        + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      // secondary queue size
+      assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
+          NUM_PUTS);
+      System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":"
+          + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
+    });
 
     vm4.invoke(() -> WANTestBase.resumeSender("ln"));
     vm5.invoke(() -> WANTestBase.resumeSender("ln"));
@@ -162,15 +166,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
 
-    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // events distributed:
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
-    // secondary queue size:
-    assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // events distributed:
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
+      // secondary queue size:
+      assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0);
+    });
   }
 
   // TODO: add a test without redundancy for primary switch
@@ -192,28 +198,26 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     putKeyValues();
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-
-    // queue size:
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
-    // events received:
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(
-        NUM_PUTS * 2);
-    // events queued:
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(
-        NUM_PUTS * 2);
-    // events distributed:
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
-    // secondary queue size:
-    assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
-        NUM_PUTS);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+      // queue size:
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
+      // events received:
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(
+          NUM_PUTS * 2);
+      // events queued:
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(
+          NUM_PUTS * 2);
+      // events distributed:
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
+      // secondary queue size:
+      assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
+          NUM_PUTS);
+    });
 
     vm4.invoke(() -> WANTestBase.resumeSender("ln"));
     vm5.invoke(() -> WANTestBase.resumeSender("ln"));
@@ -224,15 +228,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
 
-    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // events distributed:
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
-    // secondary queue size:
-    assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // events distributed:
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
+      // secondary queue size:
+      assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0);
+    });
   }
 
   @Test
@@ -254,27 +260,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     putKeyValues();
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-
-    // queue size:
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
-    // events received:
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS);
-    // events queued:
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS);
-    // events distributed
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
-    // batches distributed:
-    assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(0);
-    // batches redistributed
-    assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+      // queue size:
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
+      // events received:
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS);
+      // events queued:
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS);
+      // events distributed
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
+      // batches distributed:
+      assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(0);
+      // batches redistributed
+      assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+    });
   }
 
   @Test
@@ -297,27 +301,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size:
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-    // eventsReceived:
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS);
-    // events queued:
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS);
-    // events distributed:
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
-    // batches distributed:
-    assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
-    // batches redistributed:
-    assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+      // eventsReceived:
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS);
+      // events queued:
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS);
+      // events distributed:
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
+      // batches distributed:
+      assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
+      // batches redistributed:
+      assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+    });
 
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
   }
@@ -369,27 +371,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
     vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size:
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-    // eventsReceived:
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries);
-    // events queued:
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries);
-    // events distributed:
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries);
-    // batches distributed:
-    assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(2);
-    // batches redistributed:
-    assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+      // eventsReceived:
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries);
+      // events queued:
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries);
+      // events distributed:
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries);
+      // batches distributed:
+      assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(2);
+      // batches redistributed:
+      assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+    });
   }
 
   @Test
@@ -496,26 +496,24 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
   private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(
       boolean isBatchesRedistributed) {
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size:
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-    // batches redistributed:
-    int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5);
-    if (isBatchesRedistributed) {
-      assertThat(batchesRedistributed).isGreaterThan(0);
-    } else {
-      assertThat(batchesRedistributed).isEqualTo(0);
-    }
-    // batches with incomplete transactions
-    assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(7)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+      // batches redistributed:
+      int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5);
+      if (isBatchesRedistributed) {
+        assertThat(batchesRedistributed).isGreaterThan(0);
+      } else {
+        assertThat(batchesRedistributed).isEqualTo(0);
+      }
+      // batches with incomplete transactions
+      assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(7)).isEqualTo(0);
+    });
 
     vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
     vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
@@ -577,31 +575,29 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
     vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size:
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-    // eventsReceived:
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries);
-    // events queued:
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries);
-    // events distributed:
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries);
-    // batches distributed:
-    assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(1);
-    // batches redistributed:
-    assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
-    // events not queued conflated:
-    assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(0);
-    // batches with incomplete transactions
-    assertThat((int) v4List.get(13)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+      // eventsReceived:
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries);
+      // events queued:
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries);
+      // events distributed:
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries);
+      // batches distributed:
+      assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(1);
+      // batches redistributed:
+      assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+      // events not queued conflated:
+      assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(0);
+      // batches with incomplete transactions
+      assertThat((int) v4List.get(13)).isEqualTo(0);
+    });
 
   }
 
@@ -641,29 +637,31 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> WANTestBase.validateRegionSize(testName, entries));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
     // The number of batches will be 4 because each
     // dispatcher thread (there are 2) will send half the number of entries,
     // each on 2 batches.
-    int batches = 4;
-    // queue size:
-    assertThat((int) v4List.get(0)).isEqualTo(0);
-    // eventsReceived:
-    assertThat((int) v4List.get(1)).isEqualTo(entries);
-    // events queued:
-    assertThat((int) v4List.get(2)).isEqualTo(entries);
-    // events distributed:
-    assertThat((int) v4List.get(3)).isEqualTo(entries);
-    // batches distributed:
-    assertThat((int) v4List.get(4)).isEqualTo(batches);
-    // batches redistributed:
-    assertThat((int) v4List.get(5)).isEqualTo(0);
-    // events not queued conflated:
-    assertThat((int) v4List.get(7)).isEqualTo(0);
-    // batches with incomplete transactions
-    assertThat((int) v4List.get(13)).isEqualTo(batches);
+    final int batches = 4;
+
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat((int) v4List.get(0)).isEqualTo(0);
+      // eventsReceived:
+      assertThat((int) v4List.get(1)).isEqualTo(entries);
+      // events queued:
+      assertThat((int) v4List.get(2)).isEqualTo(entries);
+      // events distributed:
+      assertThat((int) v4List.get(3)).isEqualTo(entries);
+      // batches distributed:
+      assertThat((int) v4List.get(4)).isEqualTo(batches);
+      // batches redistributed:
+      assertThat((int) v4List.get(5)).isEqualTo(0);
+      // events not queued conflated:
+      assertThat((int) v4List.get(7)).isEqualTo(0);
+      // batches with incomplete transactions
+      assertThat((int) v4List.get(13)).isEqualTo(batches);
+    });
 
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(batches, entries, entries));
   }
@@ -718,21 +716,22 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     createReceiverInVMs(vm2);
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size:
-    assertThat((int) v4List.get(0)).isEqualTo(0);
-    // events received:
-    assertThat((int) v4List.get(1)).isEqualTo(entries);
-    // events queued:
-    assertThat((int) v4List.get(2)).isEqualTo(entries);
-    // events distributed:
-    assertThat((int) v4List.get(3)).isEqualTo(entries);
-    // batches distributed:
-    assertThat((int) v4List.get(4)).isEqualTo(3);
-    // batches redistributed:
-    assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue();
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat((int) v4List.get(0)).isEqualTo(0);
+      // events received:
+      assertThat((int) v4List.get(1)).isEqualTo(entries);
+      // events queued:
+      assertThat((int) v4List.get(2)).isEqualTo(entries);
+      // events distributed:
+      assertThat((int) v4List.get(3)).isEqualTo(entries);
+      // batches distributed:
+      assertThat((int) v4List.get(4)).isEqualTo(3);
+      // batches redistributed:
+      assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue();
+    });
   }
 
   @Test
@@ -793,23 +792,24 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     createReceiverInVMs(vm2);
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size:
-    assertThat((int) v4List.get(0)).isEqualTo(0);
-    // events received:
-    assertThat((int) v4List.get(1)).isEqualTo(entries);
-    // events queued:
-    assertThat((int) v4List.get(2)).isEqualTo(entries);
-    // events distributed:
-    assertThat((int) v4List.get(3)).isEqualTo(entries);
-    // batches distributed:
-    assertThat((int) v4List.get(4)).isEqualTo(2);
-    // batches redistributed:
-    assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue();
-    // events not queued conflated:
-    assertThat((int) v4List.get(7)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat((int) v4List.get(0)).isEqualTo(0);
+      // events received:
+      assertThat((int) v4List.get(1)).isEqualTo(entries);
+      // events queued:
+      assertThat((int) v4List.get(2)).isEqualTo(entries);
+      // events distributed:
+      assertThat((int) v4List.get(3)).isEqualTo(entries);
+      // batches distributed:
+      assertThat((int) v4List.get(4)).isEqualTo(2);
+      // batches redistributed:
+      assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue();
+      // events not queued conflated:
+      assertThat((int) v4List.get(7)).isEqualTo(0);
+    });
   }
 
   @Test
@@ -832,27 +832,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size:
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-    // events received:
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(400);
-    // events queued:
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(400);
-    // events distributed
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
-    // batches distributed:
-    assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
-    // batches redistributed:
-    assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+      // events received:
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(400);
+      // events queued:
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(400);
+      // events distributed
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
+      // batches distributed:
+      assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
+      // batches redistributed:
+      assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+    });
 
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
   }
@@ -888,24 +886,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
     vm3.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
 
-    ArrayList<Integer> v4Sender1List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln1", 0));
-    ArrayList<Integer> v4Sender2List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln2", 0));
-
-    assertThat(v4Sender1List.get(0).intValue()).isEqualTo(0); // queue size
-    assertThat(v4Sender1List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived
-    assertThat(v4Sender1List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued
-    assertThat(v4Sender1List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed
-    assertThat(v4Sender1List.get(4) >= 10).isTrue(); // batches distributed
-    assertThat(v4Sender1List.get(5).intValue()).isEqualTo(0); // batches redistributed
-
-    assertThat(v4Sender2List.get(0).intValue()).isEqualTo(0); // queue size
-    assertThat(v4Sender2List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived
-    assertThat(v4Sender2List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued
-    assertThat(v4Sender2List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed
-    assertThat(v4Sender2List.get(4) >= 10).isTrue(); // batches distributed
-    assertThat(v4Sender2List.get(5).intValue()).isEqualTo(0); // batches redistributed
+    await().untilAsserted(() -> {
+      List<Integer> v4Sender1List = vm4.invoke(() -> WANTestBase.getSenderStats("ln1", 0));
+      assertThat(v4Sender1List.get(0).intValue()).isEqualTo(0); // queue size
+      assertThat(v4Sender1List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived
+      assertThat(v4Sender1List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued
+      assertThat(v4Sender1List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed
+      assertThat(v4Sender1List.get(4) >= 10).isTrue(); // batches distributed
+      assertThat(v4Sender1List.get(5).intValue()).isEqualTo(0); // batches redistributed
+    });
+
+    await().untilAsserted(() -> {
+      List<Integer> v4Sender2List = vm4.invoke(() -> WANTestBase.getSenderStats("ln2", 0));
+      assertThat(v4Sender2List.get(0).intValue()).isEqualTo(0); // queue size
+      assertThat(v4Sender2List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived
+      assertThat(v4Sender2List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued
+      assertThat(v4Sender2List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed
+      assertThat(v4Sender2List.get(4) >= 10).isTrue(); // batches distributed
+      assertThat(v4Sender2List.get(5).intValue()).isEqualTo(0); // batches redistributed
+    });
 
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
     vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
@@ -944,25 +943,27 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 1000));
 
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size
-    int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
-    // We may see a single retried event on all members due to the kill
-    assertThat(3000 <= receivedEvents && 3003 >= receivedEvents).as("Received " + receivedEvents)
-        .isTrue(); // eventsReceived
-    int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
-    assertThat(3000 <= queuedEvents && 3003 >= queuedEvents).as("Queued " + queuedEvents).isTrue(); // eventsQueued
-    // assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed :
-    // its quite possible that vm4 has distributed some of the events
-    // assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its
-    // quite possible that vm4 has distributed some of the batches.
-    assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches redistributed
+    await().untilAsserted(() -> {
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size
+      int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
+      // We may see a single retried event on all members due to the kill
+      assertThat(3000 <= receivedEvents && 3003 >= receivedEvents).as("Received " + receivedEvents)
+          .isTrue(); // eventsReceived
+      int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
+      assertThat(3000 <= queuedEvents && 3003 >= queuedEvents).as("Queued " + queuedEvents)
+          .isTrue(); // eventsQueued
+      // assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed :
+      // its quite possible that vm4 has distributed some of the events
+      // assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed :
+      // its
+      // quite possible that vm4 has distributed some of the batches.
+      assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches
+                                                                              // redistributed
+    });
 
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000));
   }
@@ -1012,22 +1013,23 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 2000));
 
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size
-    int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
-    // We may see two retried events (as transactions are made of 2 events) on all members due to
-    // the kill
-    assertThat(6000 <= receivedEvents && 6006 >= receivedEvents).as("Received " + receivedEvents)
-        .isTrue(); // eventsReceived
-    int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
-    assertThat(6000 <= queuedEvents && 6006 >= queuedEvents).as("Queued " + queuedEvents).isTrue(); // eventsQueued
-    assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches redistributed
+    await().untilAsserted(() -> {
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size
+      int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
+      // We may see two retried events (as transactions are made of 2 events) on all members due to
+      // the kill
+      assertThat(6000 <= receivedEvents && 6006 >= receivedEvents).as("Received " + receivedEvents)
+          .isTrue(); // eventsReceived
+      int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
+      assertThat(6000 <= queuedEvents && 6006 >= queuedEvents).as("Queued " + queuedEvents)
+          .isTrue(); // eventsQueued
+      assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches
+                                                                              // redistributed
+    });
 
     // batchesReceived is equal to numberOfEntries/(batchSize+1)
     // As transactions are 2 events long, for each batch it will always be necessary to
@@ -1065,19 +1067,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     // verify that all is well in local site. All the events should be present in local region
     vm4.invoke(() -> WANTestBase.validateRegionSize(testName, 2000));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-
-    // batches distributed: it's quite possible that vm4 has distributed some of the batches.
-    assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1).isTrue();
-    // batches redistributed:
-    assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1).isTrue();
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+      // batches distributed: it's quite possible that vm4 has distributed some of the batches.
+      assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1).isTrue();
+      // batches redistributed:
+      assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1).isTrue();
+    });
   }
 
   @Test
@@ -1111,29 +1111,28 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 800));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size:
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-    // events received:
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(1000);
-    // events queued:
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(900);
-    // events distributed:
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(800);
-    // batches distributed:
-    assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80).isTrue();
-    // batches redistributed:
-    assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
-    // events filtered:
-    assertThat(v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)).isEqualTo(200);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size:
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+      // events received:
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(1000);
+      // events queued:
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(900);
+      // events distributed:
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(800);
+      // batches distributed:
+      assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80).isTrue();
+      // batches redistributed:
+      assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+      // events filtered:
+      assertThat(v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)).isEqualTo(200);
+    });
+
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800));
   }
 
@@ -1198,28 +1197,30 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
 
-    List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // Verify final stats
-    // 0 -> eventQueueSize
-    // 1 -> eventsReceived
-    // 2 -> eventsQueued
-    // 3 -> eventsDistributed
-    // 4 -> batchesDistributed
-    // 5 -> batchesRedistributed
-    // 7 -> eventsNotQueuedConflated
-    // 9 -> conflationIndexesMapSize
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-    assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(200);
-    assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(200);
-    assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(150);
-    assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
-    assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
-    assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(50);
-    assertThat(v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9)).isEqualTo(0);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // Verify final stats
+      // 0 -> eventQueueSize
+      // 1 -> eventsReceived
+      // 2 -> eventsQueued
+      // 3 -> eventsDistributed
+      // 4 -> batchesDistributed
+      // 5 -> batchesRedistributed
+      // 7 -> eventsNotQueuedConflated
+      // 9 -> conflationIndexesMapSize
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+      assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(200);
+      assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(200);
+      assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(150);
+      assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
+      assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+      assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(50);
+      assertThat(v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9)).isEqualTo(0);
+    });
   }
 
   @Test
@@ -1441,12 +1442,14 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
   }
 
   private void verifyConflationIndexesSize(String senderId, int expectedSize, VM... vms) {
-    int actualSize = 0;
-    for (VM vm : vms) {
-      List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1));
-      actualSize += stats.get(9);
-    }
-    assertThat(actualSize).isEqualTo(expectedSize);
+    await().untilAsserted(() -> {
+      int actualSize = 0;
+      for (VM vm : vms) {
+        List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1));
+        actualSize += stats.get(9);
+      }
+      assertThat(actualSize).isEqualTo(expectedSize);
+    });
   }
 
   private void putSameEntry(String regionName, int numIterations) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index 7fc0d15fd9..aac27e1393 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -21,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -177,12 +176,13 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
     vm4.invoke(() -> WANTestBase.pauseSender("ln"));
 
     vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
-    // secondary queue size stats in serial queue should be 0
-    assertThat(v4List.get(10) + v5List.get(10)).isEqualTo(0);
+
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+      // secondary queue size stats in serial queue should be 0
+      assertThat(v4List.get(10) + v5List.get(10)).isEqualTo(0);
+    });
 
     Map<String, List<?>> primarySenderUpdates = vm4.invoke(WANTestBase::checkQueue);
     Map<String, List<?>> secondarySenderUpdates = vm5.invoke(WANTestBase::checkQueue);
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
index 4e5358b478..53d52b7f49 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -26,8 +26,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.file.Files;
-import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -202,17 +202,15 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
     vm6.invoke(() -> resumeSender("ln"));
     vm7.invoke(() -> resumeSender("ln"));
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as(
-        "No events conflated in batch").isTrue();
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as(
+          "No events conflated in batch").isTrue();
+    });
   }
 
   @Test
@@ -256,28 +254,28 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
       vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
     }
 
-    ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
-    assertThat((int) v4List.get(0)).as(
-        "After conflation during enqueue, there should be only 20 events").isEqualTo(20);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
+      assertThat((int) v4List.get(0)).as(
+          "After conflation during enqueue, there should be only 20 events").isEqualTo(20);
+    });
 
     vm4.invoke(() -> resumeSender("ln"));
     vm5.invoke(() -> resumeSender("ln"));
     vm6.invoke(() -> resumeSender("ln"));
     vm7.invoke(() -> resumeSender("ln"));
 
-    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))).as(
-        "No events in secondary queue stats since it's serial sender").isEqualTo(0);
-    assertThat((v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2))).as(
-        "Total queued events should be 100").isEqualTo(100);
+    await().untilAsserted(() -> {
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))).as(
+          "No events in secondary queue stats since it's serial sender").isEqualTo(0);
+      assertThat((v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2))).as(
+          "Total queued events should be 100").isEqualTo(100);
+    });
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
 
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
index de4a19abaa..a027fdd274 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -921,28 +921,26 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
 
   private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(
       boolean isBatchesRedistributed) {
-    // Wait for sender queues to be empty
-    List<Integer> v4List =
-        vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    List<Integer> v5List =
-        vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    List<Integer> v6List =
-        vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    List<Integer> v7List =
-        vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    // queue size must be 0
-    assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-
-    // batches redistributed:
-    int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5);
-    if (isBatchesRedistributed) {
-      assertThat(batchesRedistributed).isGreaterThan(0);
-    } else {
-      assertThat(batchesRedistributed).isEqualTo(0);
-    }
+    await().untilAsserted(() -> {
+      // Wait for sender queues to be empty
+      List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+      List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+      // queue size must be 0
+      assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+
+      // batches redistributed:
+      int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5);
+      if (isBatchesRedistributed) {
+        assertThat(batchesRedistributed).isGreaterThan(0);
+      } else {
+        assertThat(batchesRedistributed).isEqualTo(0);
+      }
 
-    // batches with incomplete transactions must be 0
-    assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(13)).isEqualTo(0);
+      // batches with incomplete transactions must be 0
+      assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(13)).isEqualTo(0);
+    });
   }
 }