You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2022/04/14 15:45:57 UTC
[geode] 03/05: GEODE-8228: Await on stats.
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7514b7c53c6fee87896badec387158ba85b8a4c8
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Fri Apr 8 14:50:11 2022 -0700
GEODE-8228: Await on stats.
---
.../geode/internal/cache/wan/WANTestBase.java | 81 ++-
...rallelGatewaySenderAlertThresholdDUnitTest.java | 41 +-
.../ParallelGatewaySenderOperationsDUnitTest.java | 22 +-
.../parallel/ParallelWANConflationDUnitTest.java | 101 ++-
.../wan/parallel/ParallelWANStatsDUnitTest.java | 745 +++++++++++----------
.../serial/SerialGatewaySenderQueueDUnitTest.java | 14 +-
.../wan/serial/SerialWANConflationDUnitTest.java | 54 +-
.../cache/wan/serial/SerialWANStatsDUnitTest.java | 42 +-
8 files changed, 544 insertions(+), 556 deletions(-)
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 62f4ca9749..16ea5f6b91 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1159,11 +1159,8 @@ public class WANTestBase extends DistributedTestCase {
public static void checkQueueSizeInStats(String senderId, final int expectedQueueSize) {
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
GatewaySenderStats statistics = sender.getStatistics();
- await()
- .untilAsserted(() -> assertThat(statistics.getEventQueueSize()).as(
- "Expected queue size: " + expectedQueueSize
- + " but actual size: " + statistics.getEventQueueSize())
- .isEqualTo(expectedQueueSize));
+ await().untilAsserted(() -> assertThat(statistics.getEventQueueSize())
+ .isEqualTo(expectedQueueSize));
}
public static void checkConnectionStats(String senderId) {
@@ -1186,11 +1183,7 @@ public class WANTestBase extends DistributedTestCase {
(ConcurrentParallelGatewaySenderQueue) regionQueue;
parallelGatewaySenderQueue.getRegions();
}
- await()
- .untilAsserted(() -> assertThat(regionQueue.size()).as(
- "Expected queue entries: " + expectedQueueSize
- + " but actual entries: " + regionQueue.size())
- .isEqualTo(expectedQueueSize));
+ await().untilAsserted(() -> assertThat(regionQueue.size()).isEqualTo(expectedQueueSize));
}
ArrayList<Integer> stats = new ArrayList<>();
stats.add(statistics.getEventQueueSize());
@@ -1237,10 +1230,12 @@ public class WANTestBase extends DistributedTestCase {
public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
final int eventsQueued, final int eventsDistributed) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
- assertThat(statistics.getEventQueueSize()).isEqualTo(queueSize);
- assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
- assertThat(statistics.getEventsQueued()).isEqualTo(eventsQueued);
- assert (statistics.getEventsDistributed() >= eventsDistributed);
+ await().untilAsserted(() -> {
+ assertThat(statistics.getEventQueueSize()).isEqualTo(queueSize);
+ assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
+ assertThat(statistics.getEventsQueued()).isEqualTo(eventsQueued);
+ assertThat(statistics.getEventsDistributed()).isGreaterThanOrEqualTo(eventsDistributed);
+ });
}
public static void checkGatewayReceiverStats(int processBatches, int eventsReceived,
@@ -1322,19 +1317,23 @@ public class WANTestBase extends DistributedTestCase {
public static void checkEventFilteredStats(String senderId, final int eventsFiltered) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
- assertThat(statistics.getEventsFiltered()).isEqualTo(eventsFiltered);
+ await()
+ .untilAsserted(() -> assertThat(statistics.getEventsFiltered()).isEqualTo(eventsFiltered));
}
public static void checkConflatedStats(String senderId, final int eventsConflated) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
- assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated);
+ await().untilAsserted(
+ () -> assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated));
}
public static void checkStats_Failover(String senderId, final int eventsReceived) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
- assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
- assertThat((statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary()
- + statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived);
+ await().untilAsserted(() -> {
+ assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
+ assertThat((statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary()
+ + statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived);
+ });
}
public static void checkBatchStats(String senderId, final int batches) {
@@ -1343,38 +1342,46 @@ public class WANTestBase extends DistributedTestCase {
public static void checkBatchStats(String senderId, final int batches, boolean isExact) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
- if (isExact) {
- assert (statistics.getBatchesDistributed() == batches);
- } else {
- assert (statistics.getBatchesDistributed() >= batches);
- }
- assertThat(statistics.getBatchesRedistributed()).isEqualTo(0);
+ await().untilAsserted(() -> {
+ if (isExact) {
+ assertThat(statistics.getBatchesDistributed()).isEqualTo(batches);
+ } else {
+ assertThat(statistics.getBatchesDistributed()).isGreaterThanOrEqualTo(batches);
+ }
+ assertThat(statistics.getBatchesRedistributed()).isEqualTo(0);
+ });
}
public static void checkBatchStats(String senderId, final int batches,
boolean isExact, final boolean batchesRedistributed) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
- if (isExact) {
- assert (statistics.getBatchesDistributed() == batches);
- } else {
- assert (statistics.getBatchesDistributed() >= batches);
- }
- assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
+ await().untilAsserted(() -> {
+ if (isExact) {
+ assertThat(statistics.getBatchesDistributed()).isEqualTo(batches);
+ } else {
+ assertThat(statistics.getBatchesDistributed()).isGreaterThanOrEqualTo(batches);
+ }
+ assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
+ });
}
public static void checkBatchStats(String senderId, final boolean batchesDistributed,
final boolean batchesRedistributed) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
- assertThat((statistics.getBatchesDistributed() > 0)).isEqualTo(batchesDistributed);
- assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
+ await().untilAsserted(() -> {
+ assertThat((statistics.getBatchesDistributed() > 0)).isEqualTo(batchesDistributed);
+ assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
+ });
}
public static void checkUnProcessedStats(String senderId, int events) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
- assertThat((statistics.getUnprocessedEventsAddedBySecondary()
- + statistics.getUnprocessedTokensRemovedBySecondary())).isEqualTo(events);
- assertThat((statistics.getUnprocessedEventsRemovedByPrimary()
- + statistics.getUnprocessedTokensAddedByPrimary())).isEqualTo(events);
+ await().untilAsserted(() -> {
+ assertThat((statistics.getUnprocessedEventsAddedBySecondary()
+ + statistics.getUnprocessedTokensRemovedBySecondary())).isEqualTo(events);
+ assertThat((statistics.getUnprocessedEventsRemovedByPrimary()
+ + statistics.getUnprocessedTokensAddedByPrimary())).isEqualTo(events);
+ });
}
public static GatewaySenderStats getGatewaySenderStats(String senderId) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java
index 2b0c332e4a..f7e2f77f85 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java
@@ -17,7 +17,7 @@ package org.apache.geode.internal.cache.wan.parallel;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
-import java.util.ArrayList;
+import java.util.List;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -86,17 +86,16 @@ public class ParallelGatewaySenderAlertThresholdDUnitTest extends WANTestBase {
vm2.invoke(serializableRunnableIF);
vm3.invoke(serializableRunnableIF);
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- assertThat((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)) > 0).as(
- "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold > 0").isTrue();
+ assertThat((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)) > 0).as(
+ "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold > 0")
+ .isTrue();
+ });
int v4alert = vm4.invoke(
ParallelGatewaySenderAlertThresholdDUnitTest::checkSenderMBeanAlertThreshold);
@@ -161,18 +160,16 @@ public class ParallelGatewaySenderAlertThresholdDUnitTest extends WANTestBase {
vm2.invoke(serializableRunnableIF);
vm3.invoke(serializableRunnableIF);
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- assertThat(0).as(
- "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold = 0")
- .isEqualTo((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)));
+ assertThat(0).as(
+ "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold = 0")
+ .isEqualTo((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)));
+ });
int v4alert = vm4.invoke(
ParallelGatewaySenderAlertThresholdDUnitTest::checkSenderMBeanAlertThreshold);
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index a9fe2e04c3..78e2aa2360 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -479,14 +479,10 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
async.join();
- ArrayList<Integer> vm4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
- ArrayList<Integer> vm5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
- ArrayList<Integer> vm6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
- ArrayList<Integer> vm7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+ List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+ List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+ List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+ List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) {
foundEventsDroppedDueToPrimarySenderNotRunning = true;
}
@@ -1258,12 +1254,10 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
.isEqualTo(100);
await().untilAsserted(() -> {
- int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"));
- int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"));
- int vm6SecondarySize = vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"));
- int vm7SecondarySize = vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"));
- assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize + vm7SecondarySize)
- .isEqualTo(0);
+ assertThat(vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+ assertThat(vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+ assertThat(vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+ assertThat(vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
});
}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index e7dafb0e95..88fa8a43b3 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -17,8 +17,8 @@ package org.apache.geode.internal.cache.wan.parallel;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -120,26 +120,26 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm6.invoke(() -> enableConflation("ln"));
vm7.invoke(() -> enableConflation("ln"));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100));
- assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100).as(
- "Event in secondary queue should be 100").isTrue();
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+ assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100).as(
+ "Event in secondary queue should be 100").isTrue();
+ });
resumeSenders();
- v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as(
- "No events conflated in batch").isTrue();
+ assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as(
+ "No events conflated in batch").isTrue();
+ });
verifySecondaryEventQueuesDrained();
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
@@ -148,15 +148,10 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
private void verifySecondaryEventQueuesDrained() {
await().untilAsserted(() -> {
- int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"));
- int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"));
- int vm6SecondarySize = vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"));
- int vm7SecondarySize = vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"));
-
- assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize + vm7SecondarySize).as(
- "Event in secondary queue should be 0 after dispatched, but actual is " + vm4SecondarySize
- + ":" + vm5SecondarySize + ":" + vm6SecondarySize + ":" + vm7SecondarySize)
- .isEqualTo(0);
+ assertThat(vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+ assertThat(vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+ assertThat(vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
+ assertThat(vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero();
});
}
@@ -217,38 +212,34 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
}
private void validateSecondaryEventQueueSize(int expectedNum, int redundancy) {
- ArrayList<Integer> vm4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
- ArrayList<Integer> vm5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
- ArrayList<Integer> vm6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
- ArrayList<Integer> vm7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
- assertThat(
- (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum
- * redundancy).as(
- "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is "
- + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)))
- .isTrue();
+ await().untilAsserted(() -> {
+ List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ assertThat(
+ (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum
+ * redundancy).as(
+ "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is "
+ + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)))
+ .isTrue();
+ });
}
private void validateEventsProcessedByPQRM(int expectedNum, int redundancy) {
- ArrayList<Integer> vm4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> vm5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> vm6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> vm7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- assertThat(
- (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum
- * redundancy).as(
- "Event processed by queue removal message should be " + (expectedNum * redundancy)
- + ", but is "
- + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)))
- .isTrue();
+ await().untilAsserted(() -> {
+ List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ assertThat(
+ (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum
+ * redundancy).as(
+ "Event processed by queue removal message should be " + (expectedNum * redundancy)
+ + ", but is "
+ + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)))
+ .isTrue();
+ });
}
@Test
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 79f7afb438..ce7eb03b8b 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -102,38 +102,41 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
createReceiverPR(vm2, 1);
putKeyValues();
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS); // queue
- // size
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(
- NUM_PUTS * 2); // eventsReceived
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(
- NUM_PUTS * 2); // events queued
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0); // events
- // distributed
- assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
- NUM_PUTS); // secondary queue size
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+ // queue size
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
+ // eventsReceived
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(
+ NUM_PUTS * 2);
+ // events queued
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(
+ NUM_PUTS * 2);
+ // events distributed
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
+ // secondary queue size
+ assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
+ NUM_PUTS);
+
+ System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10)
+ + ":" + v6List.get(10) + ":" + v7List.get(10));
+ });
// stop vm7 to trigger rebalance and move some primary buckets
- System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10)
- + ":" + v6List.get(10) + ":" + v7List.get(10));
vm7.invoke(WANTestBase::closeCache);
await().untilAsserted(() -> {
int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
assertThat(v4secondarySize + v5secondarySize + v6secondarySize).isEqualTo(NUM_PUTS);
+ System.out
+ .println("New secondary queue sizes:" + v4secondarySize + ":" + v5secondarySize + ":"
+ + v6secondarySize);
});
- System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":"
- + v6List.get(10));
vm7.invoke(() -> WANTestBase.createCache(lnPort));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
@@ -141,16 +144,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
startSenderInVMs("ln", vm7);
vm7.invoke(() -> pauseSender("ln"));
- v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
- NUM_PUTS); // secondary
- // queue
- // size
- System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":"
- + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // secondary queue size
+ assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
+ NUM_PUTS);
+ System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":"
+ + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
+ });
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
vm5.invoke(() -> WANTestBase.resumeSender("ln"));
@@ -162,15 +166,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
- v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // events distributed:
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
- // secondary queue size:
- assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // events distributed:
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
+ // secondary queue size:
+ assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0);
+ });
}
// TODO: add a test without redundancy for primary switch
@@ -192,28 +198,26 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
putKeyValues();
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-
- // queue size:
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
- // events received:
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(
- NUM_PUTS * 2);
- // events queued:
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(
- NUM_PUTS * 2);
- // events distributed:
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
- // secondary queue size:
- assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
- NUM_PUTS);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+ // queue size:
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
+ // events received:
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(
+ NUM_PUTS * 2);
+ // events queued:
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(
+ NUM_PUTS * 2);
+ // events distributed:
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
+ // secondary queue size:
+ assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(
+ NUM_PUTS);
+ });
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
vm5.invoke(() -> WANTestBase.resumeSender("ln"));
@@ -224,15 +228,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
- v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // events distributed:
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
- // secondary queue size:
- assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // events distributed:
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
+ // secondary queue size:
+ assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0);
+ });
}
@Test
@@ -254,27 +260,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
putKeyValues();
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
-
- // queue size:
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
- // events received:
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS);
- // events queued:
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS);
- // events distributed
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
- // batches distributed:
- assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(0);
- // batches redistributed
- assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+ // queue size:
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS);
+ // events received:
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS);
+ // events queued:
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS);
+ // events distributed
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0);
+ // batches distributed:
+ assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(0);
+ // batches redistributed
+ assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ });
}
@Test
@@ -297,27 +301,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size:
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
- // eventsReceived:
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS);
- // events queued:
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS);
- // events distributed:
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
- // batches distributed:
- assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
- // batches redistributed:
- assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+ // eventsReceived:
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS);
+ // events queued:
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS);
+ // events distributed:
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
+ // batches distributed:
+ assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
+ // batches redistributed:
+ assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ });
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
}
@@ -369,27 +371,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size:
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
- // eventsReceived:
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries);
- // events queued:
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries);
- // events distributed:
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries);
- // batches distributed:
- assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(2);
- // batches redistributed:
- assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+ // eventsReceived:
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries);
+ // events queued:
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries);
+ // events distributed:
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries);
+ // batches distributed:
+ assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(2);
+ // batches redistributed:
+ assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ });
}
@Test
@@ -496,26 +496,24 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(
boolean isBatchesRedistributed) {
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size:
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
- // batches redistributed:
- int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5);
- if (isBatchesRedistributed) {
- assertThat(batchesRedistributed).isGreaterThan(0);
- } else {
- assertThat(batchesRedistributed).isEqualTo(0);
- }
- // batches with incomplete transactions
- assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(7)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+ // batches redistributed:
+ int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5);
+ if (isBatchesRedistributed) {
+ assertThat(batchesRedistributed).isGreaterThan(0);
+ } else {
+ assertThat(batchesRedistributed).isEqualTo(0);
+ }
+ // batches with incomplete transactions
+ assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(7)).isEqualTo(0);
+ });
vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
@@ -577,31 +575,29 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions));
vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size:
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
- // eventsReceived:
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries);
- // events queued:
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries);
- // events distributed:
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries);
- // batches distributed:
- assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(1);
- // batches redistributed:
- assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
- // events not queued conflated:
- assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(0);
- // batches with incomplete transactions
- assertThat((int) v4List.get(13)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+ // eventsReceived:
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries);
+ // events queued:
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries);
+ // events distributed:
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries);
+ // batches distributed:
+ assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(1);
+ // batches redistributed:
+ assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ // events not queued conflated:
+ assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(0);
+ // batches with incomplete transactions
+ assertThat((int) v4List.get(13)).isEqualTo(0);
+ });
}
@@ -641,29 +637,31 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.validateRegionSize(testName, entries));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
// The number of batches will be 4 because each
// dispatcher thread (there are 2) will send half the number of entries,
// each on 2 batches.
- int batches = 4;
- // queue size:
- assertThat((int) v4List.get(0)).isEqualTo(0);
- // eventsReceived:
- assertThat((int) v4List.get(1)).isEqualTo(entries);
- // events queued:
- assertThat((int) v4List.get(2)).isEqualTo(entries);
- // events distributed:
- assertThat((int) v4List.get(3)).isEqualTo(entries);
- // batches distributed:
- assertThat((int) v4List.get(4)).isEqualTo(batches);
- // batches redistributed:
- assertThat((int) v4List.get(5)).isEqualTo(0);
- // events not queued conflated:
- assertThat((int) v4List.get(7)).isEqualTo(0);
- // batches with incomplete transactions
- assertThat((int) v4List.get(13)).isEqualTo(batches);
+ final int batches = 4;
+
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat((int) v4List.get(0)).isEqualTo(0);
+ // eventsReceived:
+ assertThat((int) v4List.get(1)).isEqualTo(entries);
+ // events queued:
+ assertThat((int) v4List.get(2)).isEqualTo(entries);
+ // events distributed:
+ assertThat((int) v4List.get(3)).isEqualTo(entries);
+ // batches distributed:
+ assertThat((int) v4List.get(4)).isEqualTo(batches);
+ // batches redistributed:
+ assertThat((int) v4List.get(5)).isEqualTo(0);
+ // events not queued conflated:
+ assertThat((int) v4List.get(7)).isEqualTo(0);
+ // batches with incomplete transactions
+ assertThat((int) v4List.get(13)).isEqualTo(batches);
+ });
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(batches, entries, entries));
}
@@ -718,21 +716,22 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
createReceiverInVMs(vm2);
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size:
- assertThat((int) v4List.get(0)).isEqualTo(0);
- // events received:
- assertThat((int) v4List.get(1)).isEqualTo(entries);
- // events queued:
- assertThat((int) v4List.get(2)).isEqualTo(entries);
- // events distributed:
- assertThat((int) v4List.get(3)).isEqualTo(entries);
- // batches distributed:
- assertThat((int) v4List.get(4)).isEqualTo(3);
- // batches redistributed:
- assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue();
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat((int) v4List.get(0)).isEqualTo(0);
+ // events received:
+ assertThat((int) v4List.get(1)).isEqualTo(entries);
+ // events queued:
+ assertThat((int) v4List.get(2)).isEqualTo(entries);
+ // events distributed:
+ assertThat((int) v4List.get(3)).isEqualTo(entries);
+ // batches distributed:
+ assertThat((int) v4List.get(4)).isEqualTo(3);
+ // batches redistributed:
+ assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue();
+ });
}
@Test
@@ -793,23 +792,24 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
createReceiverInVMs(vm2);
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size:
- assertThat((int) v4List.get(0)).isEqualTo(0);
- // events received:
- assertThat((int) v4List.get(1)).isEqualTo(entries);
- // events queued:
- assertThat((int) v4List.get(2)).isEqualTo(entries);
- // events distributed:
- assertThat((int) v4List.get(3)).isEqualTo(entries);
- // batches distributed:
- assertThat((int) v4List.get(4)).isEqualTo(2);
- // batches redistributed:
- assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue();
- // events not queued conflated:
- assertThat((int) v4List.get(7)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat((int) v4List.get(0)).isEqualTo(0);
+ // events received:
+ assertThat((int) v4List.get(1)).isEqualTo(entries);
+ // events queued:
+ assertThat((int) v4List.get(2)).isEqualTo(entries);
+ // events distributed:
+ assertThat((int) v4List.get(3)).isEqualTo(entries);
+ // batches distributed:
+ assertThat((int) v4List.get(4)).isEqualTo(2);
+ // batches redistributed:
+ assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue();
+ // events not queued conflated:
+ assertThat((int) v4List.get(7)).isEqualTo(0);
+ });
}
@Test
@@ -832,27 +832,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size:
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
- // events received:
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(400);
- // events queued:
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(400);
- // events distributed
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
- // batches distributed:
- assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
- // batches redistributed:
- assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+ // events received:
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(400);
+ // events queued:
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(400);
+ // events distributed
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS);
+ // batches distributed:
+ assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
+ // batches redistributed:
+ assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ });
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
}
@@ -888,24 +886,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
vm3.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
- ArrayList<Integer> v4Sender1List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln1", 0));
- ArrayList<Integer> v4Sender2List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln2", 0));
-
- assertThat(v4Sender1List.get(0).intValue()).isEqualTo(0); // queue size
- assertThat(v4Sender1List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived
- assertThat(v4Sender1List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued
- assertThat(v4Sender1List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed
- assertThat(v4Sender1List.get(4) >= 10).isTrue(); // batches distributed
- assertThat(v4Sender1List.get(5).intValue()).isEqualTo(0); // batches redistributed
-
- assertThat(v4Sender2List.get(0).intValue()).isEqualTo(0); // queue size
- assertThat(v4Sender2List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived
- assertThat(v4Sender2List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued
- assertThat(v4Sender2List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed
- assertThat(v4Sender2List.get(4) >= 10).isTrue(); // batches distributed
- assertThat(v4Sender2List.get(5).intValue()).isEqualTo(0); // batches redistributed
+ await().untilAsserted(() -> {
+ List<Integer> v4Sender1List = vm4.invoke(() -> WANTestBase.getSenderStats("ln1", 0));
+ assertThat(v4Sender1List.get(0).intValue()).isEqualTo(0); // queue size
+ assertThat(v4Sender1List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived
+ assertThat(v4Sender1List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued
+ assertThat(v4Sender1List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed
+ assertThat(v4Sender1List.get(4) >= 10).isTrue(); // batches distributed
+ assertThat(v4Sender1List.get(5).intValue()).isEqualTo(0); // batches redistributed
+ });
+
+ await().untilAsserted(() -> {
+ List<Integer> v4Sender2List = vm4.invoke(() -> WANTestBase.getSenderStats("ln2", 0));
+ assertThat(v4Sender2List.get(0).intValue()).isEqualTo(0); // queue size
+ assertThat(v4Sender2List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived
+ assertThat(v4Sender2List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued
+ assertThat(v4Sender2List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed
+ assertThat(v4Sender2List.get(4) >= 10).isTrue(); // batches distributed
+ assertThat(v4Sender2List.get(5).intValue()).isEqualTo(0); // batches redistributed
+ });
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS));
@@ -944,25 +943,27 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 1000));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size
- int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
- // We may see a single retried event on all members due to the kill
- assertThat(3000 <= receivedEvents && 3003 >= receivedEvents).as("Received " + receivedEvents)
- .isTrue(); // eventsReceived
- int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
- assertThat(3000 <= queuedEvents && 3003 >= queuedEvents).as("Queued " + queuedEvents).isTrue(); // eventsQueued
- // assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed :
- // its quite possible that vm4 has distributed some of the events
- // assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its
- // quite possible that vm4 has distributed some of the batches.
- assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches redistributed
+ await().untilAsserted(() -> {
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size
+ int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
+ // We may see a single retried event on all members due to the kill
+ assertThat(3000 <= receivedEvents && 3003 >= receivedEvents).as("Received " + receivedEvents)
+ .isTrue(); // eventsReceived
+ int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
+ assertThat(3000 <= queuedEvents && 3003 >= queuedEvents).as("Queued " + queuedEvents)
+ .isTrue(); // eventsQueued
+ // assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed :
+ // its quite possible that vm4 has distributed some of the events
+ // assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed :
+ // its
+ // quite possible that vm4 has distributed some of the batches.
+ assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches
+ // redistributed
+ });
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000));
}
@@ -1012,22 +1013,23 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 2000));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size
- int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
- // We may see two retried events (as transactions are made of 2 events) on all members due to
- // the kill
- assertThat(6000 <= receivedEvents && 6006 >= receivedEvents).as("Received " + receivedEvents)
- .isTrue(); // eventsReceived
- int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
- assertThat(6000 <= queuedEvents && 6006 >= queuedEvents).as("Queued " + queuedEvents).isTrue(); // eventsQueued
- assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches redistributed
+ await().untilAsserted(() -> {
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size
+ int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
+ // We may see two retried events (as transactions are made of 2 events) on all members due to
+ // the kill
+ assertThat(6000 <= receivedEvents && 6006 >= receivedEvents).as("Received " + receivedEvents)
+ .isTrue(); // eventsReceived
+ int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
+ assertThat(6000 <= queuedEvents && 6006 >= queuedEvents).as("Queued " + queuedEvents)
+ .isTrue(); // eventsQueued
+ assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches
+ // redistributed
+ });
// batchesReceived is equal to numberOfEntries/(batchSize+1)
// As transactions are 2 events long, for each batch it will always be necessary to
@@ -1065,19 +1067,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
// verify that all is well in local site. All the events should be present in local region
vm4.invoke(() -> WANTestBase.validateRegionSize(testName, 2000));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-
- // batches distributed: it's quite possible that vm4 has distributed some of the batches.
- assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1).isTrue();
- // batches redistributed:
- assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1).isTrue();
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+ // batches distributed: it's quite possible that vm4 has distributed some of the batches.
+ assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1).isTrue();
+ // batches redistributed:
+ assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1).isTrue();
+ });
}
@Test
@@ -1111,29 +1111,28 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 800));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size:
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
- // events received:
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(1000);
- // events queued:
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(900);
- // events distributed:
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(800);
- // batches distributed:
- assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80).isTrue();
- // batches redistributed:
- assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
- // events filtered:
- assertThat(v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)).isEqualTo(200);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size:
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+ // events received:
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(1000);
+ // events queued:
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(900);
+ // events distributed:
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(800);
+ // batches distributed:
+ assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80).isTrue();
+ // batches redistributed:
+ assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ // events filtered:
+ assertThat(v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)).isEqualTo(200);
+ });
+
vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800));
}
@@ -1198,28 +1197,30 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
- List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // Verify final stats
- // 0 -> eventQueueSize
- // 1 -> eventsReceived
- // 2 -> eventsQueued
- // 3 -> eventsDistributed
- // 4 -> batchesDistributed
- // 5 -> batchesRedistributed
- // 7 -> eventsNotQueuedConflated
- // 9 -> conflationIndexesMapSize
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
- assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(200);
- assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(200);
- assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(150);
- assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
- assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
- assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(50);
- assertThat(v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9)).isEqualTo(0);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // Verify final stats
+ // 0 -> eventQueueSize
+ // 1 -> eventsReceived
+ // 2 -> eventsQueued
+ // 3 -> eventsDistributed
+ // 4 -> batchesDistributed
+ // 5 -> batchesRedistributed
+ // 7 -> eventsNotQueuedConflated
+ // 9 -> conflationIndexesMapSize
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+ assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(200);
+ assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(200);
+ assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(150);
+ assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue();
+ assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0);
+ assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(50);
+ assertThat(v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9)).isEqualTo(0);
+ });
}
@Test
@@ -1441,12 +1442,14 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
}
private void verifyConflationIndexesSize(String senderId, int expectedSize, VM... vms) {
- int actualSize = 0;
- for (VM vm : vms) {
- List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1));
- actualSize += stats.get(9);
- }
- assertThat(actualSize).isEqualTo(expectedSize);
+ await().untilAsserted(() -> {
+ int actualSize = 0;
+ for (VM vm : vms) {
+ List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1));
+ actualSize += stats.get(9);
+ }
+ assertThat(actualSize).isEqualTo(expectedSize);
+ });
}
private void putSameEntry(String regionName, int numIterations) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index 7fc0d15fd9..aac27e1393 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -21,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import java.io.File;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -177,12 +176,13 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
- // secondary queue size stats in serial queue should be 0
- assertThat(v4List.get(10) + v5List.get(10)).isEqualTo(0);
+
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+ // secondary queue size stats in serial queue should be 0
+ assertThat(v4List.get(10) + v5List.get(10)).isEqualTo(0);
+ });
Map<String, List<?>> primarySenderUpdates = vm4.invoke(WANTestBase::checkQueue);
Map<String, List<?>> secondarySenderUpdates = vm5.invoke(WANTestBase::checkQueue);
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
index 4e5358b478..53d52b7f49 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -26,8 +26,8 @@ import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -202,17 +202,15 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
vm6.invoke(() -> resumeSender("ln"));
vm7.invoke(() -> resumeSender("ln"));
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as(
- "No events conflated in batch").isTrue();
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as(
+ "No events conflated in batch").isTrue();
+ });
}
@Test
@@ -256,28 +254,28 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
}
- ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
- assertThat((int) v4List.get(0)).as(
- "After conflation during enqueue, there should be only 20 events").isEqualTo(20);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
+ assertThat((int) v4List.get(0)).as(
+ "After conflation during enqueue, there should be only 20 events").isEqualTo(20);
+ });
vm4.invoke(() -> resumeSender("ln"));
vm5.invoke(() -> resumeSender("ln"));
vm6.invoke(() -> resumeSender("ln"));
vm7.invoke(() -> resumeSender("ln"));
- v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))).as(
- "No events in secondary queue stats since it's serial sender").isEqualTo(0);
- assertThat((v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2))).as(
- "Total queued events should be 100").isEqualTo(100);
+ await().untilAsserted(() -> {
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))).as(
+ "No events in secondary queue stats since it's serial sender").isEqualTo(0);
+ assertThat((v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2))).as(
+ "Total queued events should be 100").isEqualTo(100);
+ });
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
index de4a19abaa..a027fdd274 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -921,28 +921,26 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(
boolean isBatchesRedistributed) {
- // Wait for sender queues to be empty
- List<Integer> v4List =
- vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- List<Integer> v5List =
- vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- List<Integer> v6List =
- vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
- List<Integer> v7List =
- vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
- // queue size must be 0
- assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
-
- // batches redistributed:
- int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5);
- if (isBatchesRedistributed) {
- assertThat(batchesRedistributed).isGreaterThan(0);
- } else {
- assertThat(batchesRedistributed).isEqualTo(0);
- }
+ await().untilAsserted(() -> {
+ // Wait for sender queues to be empty
+ List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ // queue size must be 0
+ assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0);
+
+ // batches redistributed:
+ int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5);
+ if (isBatchesRedistributed) {
+ assertThat(batchesRedistributed).isGreaterThan(0);
+ } else {
+ assertThat(batchesRedistributed).isEqualTo(0);
+ }
- // batches with incomplete transactions must be 0
- assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(13)).isEqualTo(0);
+ // batches with incomplete transactions must be 0
+ assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(13)).isEqualTo(0);
+ });
}
}