You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:50 UTC
[geode] 12/17: GEODE-9132:
PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 2
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 5e007a606b8377ebcbe45e3a2a512470aaa0a563
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Apr 19 16:16:17 2021 -0700
GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 2
---
...gionClearWithConcurrentOperationsDUnitTest.java | 782 ++++++++++-----------
1 file changed, 378 insertions(+), 404 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index c9a1e5b..b2aacc0 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -72,8 +72,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
private static final int BUCKETS = 13;
private static final String REGION_NAME = "PartitionedRegion";
- private static final String TEST_CASE_NAME =
- "[{index}] {method}(Coordinator:{0}, RegionType:{1})";
@Rule
public DistributedRule distributedRule = new DistributedRule(3);
@@ -84,32 +82,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
private VM server2;
private VM accessor;
- @SuppressWarnings("unused")
- static TestVM[] coordinators() {
- return new TestVM[] {
- TestVM.SERVER1, TestVM.ACCESSOR
- };
- }
-
- @SuppressWarnings("unused")
- static Object[] coordinatorsAndRegionTypes() {
- List<Object[]> parameters = new ArrayList<>();
- RegionShortcut[] regionShortcuts = regionTypes();
-
- Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
- parameters.add(new Object[] {TestVM.SERVER1, regionShortcut});
- parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut});
- });
-
- return parameters.toArray();
- }
-
- private static RegionShortcut[] regionTypes() {
- return new RegionShortcut[] {
- RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT
- };
- }
-
@Before
public void setUp() throws Exception {
server1 = getVM(TestVM.SERVER1.vmNumber);
@@ -117,127 +89,414 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
accessor = getVM(TestVM.ACCESSOR.vmNumber);
}
- private void initAccessor(RegionShortcut regionShortcut) {
- PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
- .setTotalNumBuckets(BUCKETS)
- .setLocalMaxMemory(0)
- .create();
+ /**
+ * The test does the following (clear coordinator and regionType are parametrized):
+ * - Launches one thread per VM to continuously execute removes, puts and gets for a given time.
+ * - Clears the Partition Region continuously every X milliseconds for a given time.
+ * - Asserts that, after the clears have finished, the Region Buckets are consistent across
+ * members.
+ */
+ @Test
+ @Parameters({"SERVER1,PARTITION", "ACCESSOR,PARTITION",
+ "SERVER1,PARTITION_REDUNDANT", "ACCESSOR,PARTITION_REDUNDANT"})
+ @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
+ public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
+ RegionShortcut regionShortcut) throws InterruptedException {
+ parametrizedSetup(regionShortcut);
- cacheRule.getCache().createRegionFactory(regionShortcut)
- .setPartitionAttributes(attrs)
- .create(REGION_NAME);
+ // Let all VMs continuously execute puts and gets for 60 seconds.
+ final int workMillis = 60000;
+ final int entries = 15000;
+ List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+ server1.invokeAsync(() -> executePuts(entries, workMillis)),
+ server2.invokeAsync(() -> executeGets(entries, workMillis)),
+ accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
- }
+ // Clear the region every second for 60 seconds.
+ getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000));
- private void initDataStore(RegionShortcut regionShortcut) {
- PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
- .setTotalNumBuckets(BUCKETS)
- .create();
+ // Let asyncInvocations finish.
+ for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+ asyncInvocation.await();
+ }
- cacheRule.getCache().createRegionFactory(regionShortcut)
- .setPartitionAttributes(attrs)
- .create(REGION_NAME);
+ // Assert Region Buckets are consistent.
+ asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+ accessor.invoke(this::assertRegionBucketsConsistency);
}
- private void parametrizedSetup(RegionShortcut regionShortcut) {
- server1.invoke(() -> initDataStore(regionShortcut));
- server2.invoke(() -> initDataStore(regionShortcut));
- accessor.invoke(() -> initAccessor(regionShortcut));
- }
+ /**
+ * The test does the following (clear coordinator and regionType are parametrized):
+ * - Launches two threads per VM to continuously execute putAll and removeAll for a given time.
+ * - Clears the Partition Region continuously every X milliseconds for a given time.
+ * - Asserts that, after the clears have finished, the Region Buckets are consistent across
+ * members.
+ */
+ @Test
+ @Parameters({"SERVER1,PARTITION", "ACCESSOR,PARTITION",
+ "SERVER1,PARTITION_REDUNDANT", "ACCESSOR,PARTITION_REDUNDANT"})
+ @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
+ public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
+ RegionShortcut regionShortcut) throws InterruptedException {
+ parametrizedSetup(regionShortcut);
- private void waitForSilence() {
- DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats();
- PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
- PartitionedRegionStats partitionedRegionStats = region.getPrStats();
+ // Let all VMs continuously execute putAll and removeAll for 15 seconds.
+ final int workMillis = 15000;
+ List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+ server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
+ server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
+ server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)),
+ server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)),
+ accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)),
+ accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
- await().untilAsserted(() -> {
- assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0);
- assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0);
- assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0);
- assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0);
- assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
- assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0);
- });
+ // Clear the region every half second for 15 seconds.
+ getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500));
+
+ // Let asyncInvocations finish.
+ for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+ asyncInvocation.await();
+ }
+
+ // Assert Region Buckets are consistent.
+ asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+ accessor.invoke(this::assertRegionBucketsConsistency);
}
/**
- * Populates the region and verifies the data on the selected VMs.
+ * The test does the following (regionType is parametrized):
+ * - Populates the Partition Region.
+ * - Verifies that the entries are synchronized on all members.
+ * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop the
+ * coordinator VM while the clear is in progress.
+ * - Clears the Partition Region (at this point the coordinator is restarted).
+ * - Asserts that, after the member joins again, the Region Buckets are consistent.
*/
- private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) {
- feeder.invoke(() -> {
+ @Test
+ @Parameters({"PARTITION", "PARTITION_REDUNDANT"})
+ @TestCaseName("[{index}] {method}(RegionType:{0})")
+ public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
+ parametrizedSetup(regionShortcut);
+ final int entries = 1000;
+ populateRegion(accessor, entries, asList(accessor, server1, server2));
+
+ // Set the CoordinatorMemberKiller and try to clear the region
+ server1.invoke(() -> {
+ DistributionMessageObserver.setInstance(new MemberKiller(true));
Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
- IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
+ assertThatThrownBy(region::clear)
+ .isInstanceOf(DistributedSystemDisconnectedException.class)
+ .hasCauseInstanceOf(ForcedDisconnectException.class);
});
- vms.forEach(vm -> vm.invoke(() -> {
- waitForSilence();
- Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+ // Wait for member to get back online and assign all buckets.
+ server1.invoke(() -> {
+ cacheRule.createCache();
+ initDataStore(regionShortcut);
+ await().untilAsserted(
+ () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+ PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
+ });
- IntStream.range(0, entryCount)
- .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i));
- }));
+ // Assert Region Buckets are consistent.
+ asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+ accessor.invoke(this::assertRegionBucketsConsistency);
}
/**
- * Asserts that the RegionVersionVectors for both buckets are consistent.
- *
- * @param bucketId Id of the bucket to compare.
- * @param bucketDump1 First bucketDump.
- * @param bucketDump2 Second bucketDump.
+ * The test does the following (clear coordinator is chosen through parameters):
+ * - Populates the Partition Region.
+ * - Verifies that the entries are synchronized on all members.
+ * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+ * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+ * participates on the clear operation).
+ * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
+ * - Clears the Partition Region (at this point the non-coordinator is restarted).
+ * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
*/
- private void assertRegionVersionVectorsConsistency(int bucketId, BucketDump bucketDump1,
- BucketDump bucketDump2) {
- RegionVersionVector<?> rvv1 = bucketDump1.getRvv();
- RegionVersionVector<?> rvv2 = bucketDump2.getRvv();
+ @Test
+ @Parameters({"SERVER1", "ACCESSOR"})
+ @TestCaseName("[{index}] {method}(Coordinator:{0})")
+ public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
+ TestVM coordinatorVM) throws InterruptedException {
+ parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+ final int entries = 7500;
+ populateRegion(accessor, entries, asList(accessor, server1, server2));
+ server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
- if (rvv1 == null) {
- assertThat(rvv2)
- .as("Bucket " + bucketId + " has an RVV on member " + bucketDump2.getMember()
- + ", but does not on member " + bucketDump1.getMember())
- .isNull();
- }
+ // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+ final int workMillis = 30000;
+ List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+ server1.invokeAsync(() -> executeGets(entries, workMillis)),
+ server1.invokeAsync(() -> executePuts(entries, workMillis)),
+ accessor.invokeAsync(() -> executeGets(entries, workMillis)),
+ accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
- if (rvv2 == null) {
- assertThat(rvv1)
- .as("Bucket " + bucketId + " has an RVV on member " + bucketDump1.getMember()
- + ", but does not on member " + bucketDump2.getMember())
- .isNull();
- }
+ // Retry the clear operation on the region until success (server2 will go down, but other
+ // members will eventually become primary for those buckets previously hosted by server2).
+ executeClearWithRetry(getVM(coordinatorVM.vmNumber));
- assertThat(rvv1).isNotNull();
- assertThat(rvv2).isNotNull();
- Map<VersionSource<?>, RegionVersionHolder<?>> rvv2Members =
- new HashMap<>(rvv1.getMemberToVersion());
- Map<VersionSource<?>, RegionVersionHolder<?>> rvv1Members =
- new HashMap<>(rvv1.getMemberToVersion());
- for (Map.Entry<VersionSource<?>, RegionVersionHolder<?>> entry : rvv1Members.entrySet()) {
- VersionSource<?> memberId = entry.getKey();
- RegionVersionHolder<?> versionHolder1 = entry.getValue();
- RegionVersionHolder<?> versionHolder2 = rvv2Members.remove(memberId);
- assertThat(versionHolder1)
- .as("RegionVersionVector for bucket " + bucketId + " on member " + bucketDump1.getMember()
- + " is not consistent with member " + bucketDump2.getMember())
- .isEqualTo(versionHolder2);
+ // Wait for member to get back online.
+ server2.invoke(() -> {
+ cacheRule.createCache();
+ initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+ await().untilAsserted(
+ () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+ });
+
+ // Let asyncInvocations finish.
+ for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+ asyncInvocation.await();
}
+
+ // Assert Region Buckets are consistent.
+ asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+ accessor.invoke(this::assertRegionBucketsConsistency);
}
/**
- * Asserts that the region data is consistent across buckets.
+ * The test does the following (clear coordinator is chosen through parameters):
+ * - Populates the Partition Region.
+ * - Verifies that the entries are synchronized on all members.
+ * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+ * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+ * participates on the clear operation).
+ * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
+ * - Clears the Partition Region (at this point the non-coordinator is restarted).
+ * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
+ * buckets on the the restarted members are not available).
*/
- private void assertRegionBucketsConsistency() throws ForceReattemptException {
- PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
- // Redundant copies + 1 primary.
- int expectedCopies = region.getRedundantCopies() + 1;
+ @Test
+ @Parameters({"SERVER1", "ACCESSOR"})
+ @TestCaseName("[{index}] {method}(Coordinator:{0})")
+ public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
+ TestVM coordinatorVM) throws InterruptedException {
+ parametrizedSetup(RegionShortcut.PARTITION);
+ final int entries = 7500;
+ populateRegion(accessor, entries, asList(accessor, server1, server2));
+ server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
- for (int bId = 0; bId < BUCKETS; bId++) {
- final int bucketId = bId;
- List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId);
- assertThat(bucketDumps.size())
- .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has "
- + bucketDumps.size())
- .isEqualTo(expectedCopies);
+ // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+ final int workMillis = 30000;
+ List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+ server1.invokeAsync(() -> executeGets(entries, workMillis)),
+ server1.invokeAsync(() -> executePuts(entries, workMillis)),
+ accessor.invokeAsync(() -> executeGets(entries, workMillis)),
+ accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
- // Check that all copies of the bucket have the same data.
+ // Clear the region.
+ getVM(coordinatorVM.vmNumber).invoke(() -> {
+ assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
+ .isInstanceOf(PartitionedRegionPartialClearException.class);
+ });
+
+ // Let asyncInvocations finish.
+ for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+ asyncInvocation.await();
+ }
+ }
+
+ /**
+ * The test does the following (clear coordinator is chosen through parameters):
+ * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+ * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+ * participates on the clear operation).
+ * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
+ * - Clears the Partition Region (at this point the non-coordinator is restarted).
+ * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
+ */
+ @Test
+ @Parameters({"SERVER1", "ACCESSOR"})
+ @TestCaseName("[{index}] {method}(Coordinator:{0})")
+ public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
+ TestVM coordinatorVM) throws InterruptedException {
+ parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+ server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+
+ // Let all VMs continuously execute putAll/removeAll for 30 seconds.
+ final int workMillis = 30000;
+ List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+ server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
+ accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+
+ // Retry the clear operation on the region until success (server2 will go down, but other
+ // members will eventually become primary for those buckets previously hosted by server2).
+ executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+
+ // Wait for member to get back online.
+ server2.invoke(() -> {
+ cacheRule.createCache();
+ initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+ await().untilAsserted(
+ () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+ });
+
+ // Let asyncInvocations finish.
+ for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+ asyncInvocation.await();
+ }
+
+ // Assert Region Buckets are consistent.
+ asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+ accessor.invoke(this::assertRegionBucketsConsistency);
+ }
+
+ /**
+ * The test does the following (clear coordinator is chosen through parameters):
+ * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+ * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+ * participates on the clear operation).
+ * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
+ * - Clears the Partition Region (at this point the non-coordinator is restarted).
+ * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
+ * buckets on the the restarted members are not available).
+ */
+ @Test
+ @Parameters({"SERVER1", "ACCESSOR"})
+ @TestCaseName("[{index}] {method}(Coordinator:{0})")
+ public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
+ TestVM coordinatorVM) throws InterruptedException {
+ parametrizedSetup(RegionShortcut.PARTITION);
+ server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+
+ final int workMillis = 30000;
+ List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+ server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
+ accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+
+ // Clear the region.
+ getVM(coordinatorVM.vmNumber).invoke(() -> {
+ assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
+ .isInstanceOf(PartitionedRegionPartialClearException.class);
+ });
+
+ // Let asyncInvocations finish.
+ for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+ asyncInvocation.await();
+ }
+ }
+
+ private void initAccessor(RegionShortcut regionShortcut) {
+ PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
+ .setTotalNumBuckets(BUCKETS)
+ .setLocalMaxMemory(0)
+ .create();
+
+ cacheRule.getCache().createRegionFactory(regionShortcut)
+ .setPartitionAttributes(attrs)
+ .create(REGION_NAME);
+
+ }
+
+ private void initDataStore(RegionShortcut regionShortcut) {
+ PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
+ .setTotalNumBuckets(BUCKETS)
+ .create();
+
+ cacheRule.getCache().createRegionFactory(regionShortcut)
+ .setPartitionAttributes(attrs)
+ .create(REGION_NAME);
+ }
+
+ private void parametrizedSetup(RegionShortcut regionShortcut) {
+ server1.invoke(() -> initDataStore(regionShortcut));
+ server2.invoke(() -> initDataStore(regionShortcut));
+ accessor.invoke(() -> initAccessor(regionShortcut));
+ }
+
+ private void waitForSilence() {
+ DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats();
+ PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+ PartitionedRegionStats partitionedRegionStats = region.getPrStats();
+
+ await().untilAsserted(() -> {
+ assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0);
+ });
+ }
+
+ /**
+ * Populates the region and verifies the data on the selected VMs.
+ */
+ private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) {
+ feeder.invoke(() -> {
+ Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+ IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
+ });
+
+ vms.forEach(vm -> vm.invoke(() -> {
+ waitForSilence();
+ Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+
+ IntStream.range(0, entryCount)
+ .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i));
+ }));
+ }
+
+ /**
+ * Asserts that the RegionVersionVectors for both buckets are consistent.
+ *
+ * @param bucketId Id of the bucket to compare.
+ * @param bucketDump1 First bucketDump.
+ * @param bucketDump2 Second bucketDump.
+ */
+ private void assertRegionVersionVectorsConsistency(int bucketId, BucketDump bucketDump1,
+ BucketDump bucketDump2) {
+ RegionVersionVector<?> rvv1 = bucketDump1.getRvv();
+ RegionVersionVector<?> rvv2 = bucketDump2.getRvv();
+
+ if (rvv1 == null) {
+ assertThat(rvv2)
+ .as("Bucket " + bucketId + " has an RVV on member " + bucketDump2.getMember()
+ + ", but does not on member " + bucketDump1.getMember())
+ .isNull();
+ }
+
+ if (rvv2 == null) {
+ assertThat(rvv1)
+ .as("Bucket " + bucketId + " has an RVV on member " + bucketDump1.getMember()
+ + ", but does not on member " + bucketDump2.getMember())
+ .isNull();
+ }
+
+ assertThat(rvv1).isNotNull();
+ assertThat(rvv2).isNotNull();
+ Map<VersionSource<?>, RegionVersionHolder<?>> rvv2Members =
+ new HashMap<>(rvv1.getMemberToVersion());
+ Map<VersionSource<?>, RegionVersionHolder<?>> rvv1Members =
+ new HashMap<>(rvv1.getMemberToVersion());
+ for (Map.Entry<VersionSource<?>, RegionVersionHolder<?>> entry : rvv1Members.entrySet()) {
+ VersionSource<?> memberId = entry.getKey();
+ RegionVersionHolder<?> versionHolder1 = entry.getValue();
+ RegionVersionHolder<?> versionHolder2 = rvv2Members.remove(memberId);
+ assertThat(versionHolder1)
+ .as("RegionVersionVector for bucket " + bucketId + " on member " + bucketDump1.getMember()
+ + " is not consistent with member " + bucketDump2.getMember())
+ .isEqualTo(versionHolder2);
+ }
+ }
+
+ /**
+ * Asserts that the region data is consistent across buckets.
+ */
+ private void assertRegionBucketsConsistency() throws ForceReattemptException {
+ PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+ // Redundant copies + 1 primary.
+ int expectedCopies = region.getRedundantCopies() + 1;
+
+ for (int bId = 0; bId < BUCKETS; bId++) {
+ final int bucketId = bId;
+ List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId);
+ assertThat(bucketDumps.size())
+ .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has "
+ + bucketDumps.size())
+ .isEqualTo(expectedCopies);
+
+ // Check that all copies of the bucket have the same data.
if (bucketDumps.size() > 1) {
BucketDump firstDump = bucketDumps.get(0);
@@ -375,291 +634,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
}
- /**
- * The test does the following (clear coordinator and regionType are parametrized):
- * - Launches one thread per VM to continuously execute removes, puts and gets for a given time.
- * - Clears the Partition Region continuously every X milliseconds for a given time.
- * - Asserts that, after the clears have finished, the Region Buckets are consistent across
- * members.
- */
- @Test
- @TestCaseName(TEST_CASE_NAME)
- @Parameters(method = "coordinatorsAndRegionTypes")
- public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
- RegionShortcut regionShortcut) throws InterruptedException {
- parametrizedSetup(regionShortcut);
-
- // Let all VMs continuously execute puts and gets for 60 seconds.
- final int workMillis = 60000;
- final int entries = 15000;
- List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executePuts(entries, workMillis)),
- server2.invokeAsync(() -> executeGets(entries, workMillis)),
- accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
-
- // Clear the region every second for 60 seconds.
- getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000));
-
- // Let asyncInvocations finish.
- for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
- asyncInvocation.await();
- }
-
- // Assert Region Buckets are consistent.
- asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
- accessor.invoke(this::assertRegionBucketsConsistency);
- }
-
- /**
- * The test does the following (clear coordinator and regionType are parametrized):
- * - Launches two threads per VM to continuously execute putAll and removeAll for a given time.
- * - Clears the Partition Region continuously every X milliseconds for a given time.
- * - Asserts that, after the clears have finished, the Region Buckets are consistent across
- * members.
- */
- @Test
- @TestCaseName(TEST_CASE_NAME)
- @Parameters(method = "coordinatorsAndRegionTypes")
- public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
- RegionShortcut regionShortcut) throws InterruptedException {
- parametrizedSetup(regionShortcut);
-
- // Let all VMs continuously execute putAll and removeAll for 15 seconds.
- final int workMillis = 15000;
- List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
- server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
- server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)),
- server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)),
- accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)),
- accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
-
- // Clear the region every half second for 15 seconds.
- getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500));
-
- // Let asyncInvocations finish.
- for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
- asyncInvocation.await();
- }
-
- // Assert Region Buckets are consistent.
- asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
- accessor.invoke(this::assertRegionBucketsConsistency);
- }
-
- /**
- * The test does the following (regionType is parametrized):
- * - Populates the Partition Region.
- * - Verifies that the entries are synchronized on all members.
- * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop the
- * coordinator VM while the clear is in progress.
- * - Clears the Partition Region (at this point the coordinator is restarted).
- * - Asserts that, after the member joins again, the Region Buckets are consistent.
- */
- @Test
- @TestCaseName("[{index}] {method}(RegionType:{0})")
- @Parameters(method = "regionTypes")
- public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
- parametrizedSetup(regionShortcut);
- final int entries = 1000;
- populateRegion(accessor, entries, asList(accessor, server1, server2));
-
- // Set the CoordinatorMemberKiller and try to clear the region
- server1.invoke(() -> {
- DistributionMessageObserver.setInstance(new MemberKiller(true));
- Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
- assertThatThrownBy(region::clear)
- .isInstanceOf(DistributedSystemDisconnectedException.class)
- .hasCauseInstanceOf(ForcedDisconnectException.class);
- });
-
- // Wait for member to get back online and assign all buckets.
- server1.invoke(() -> {
- cacheRule.createCache();
- initDataStore(regionShortcut);
- await().untilAsserted(
- () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
- PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
- });
-
- // Assert Region Buckets are consistent.
- asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
- accessor.invoke(this::assertRegionBucketsConsistency);
- }
-
- /**
- * The test does the following (clear coordinator is chosen through parameters):
- * - Populates the Partition Region.
- * - Verifies that the entries are synchronized on all members.
- * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
- * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
- * participates on the clear operation).
- * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
- * - Clears the Partition Region (at this point the non-coordinator is restarted).
- * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
- */
- @Test
- @Parameters(method = "coordinators")
- @TestCaseName("[{index}] {method}(Coordinator:{0})")
- public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
- TestVM coordinatorVM) throws InterruptedException {
- parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
- final int entries = 7500;
- populateRegion(accessor, entries, asList(accessor, server1, server2));
- server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
-
- // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
- final int workMillis = 30000;
- List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executeGets(entries, workMillis)),
- server1.invokeAsync(() -> executePuts(entries, workMillis)),
- accessor.invokeAsync(() -> executeGets(entries, workMillis)),
- accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
-
- // Retry the clear operation on the region until success (server2 will go down, but other
- // members will eventually become primary for those buckets previously hosted by server2).
- executeClearWithRetry(getVM(coordinatorVM.vmNumber));
-
- // Wait for member to get back online.
- server2.invoke(() -> {
- cacheRule.createCache();
- initDataStore(RegionShortcut.PARTITION_REDUNDANT);
- await().untilAsserted(
- () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
- });
-
- // Let asyncInvocations finish.
- for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
- asyncInvocation.await();
- }
-
- // Assert Region Buckets are consistent.
- asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
- accessor.invoke(this::assertRegionBucketsConsistency);
- }
-
- /**
- * The test does the following (clear coordinator is chosen through parameters):
- * - Populates the Partition Region.
- * - Verifies that the entries are synchronized on all members.
- * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
- * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
- * participates on the clear operation).
- * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
- * - Clears the Partition Region (at this point the non-coordinator is restarted).
- * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
- * buckets on the the restarted members are not available).
- */
- @Test
- @Parameters(method = "coordinators")
- @TestCaseName("[{index}] {method}(Coordinator:{0})")
- public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
- TestVM coordinatorVM) throws InterruptedException {
- parametrizedSetup(RegionShortcut.PARTITION);
- final int entries = 7500;
- populateRegion(accessor, entries, asList(accessor, server1, server2));
- server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
-
- // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
- final int workMillis = 30000;
- List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executeGets(entries, workMillis)),
- server1.invokeAsync(() -> executePuts(entries, workMillis)),
- accessor.invokeAsync(() -> executeGets(entries, workMillis)),
- accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
-
- // Clear the region.
- getVM(coordinatorVM.vmNumber).invoke(() -> {
- assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
- .isInstanceOf(PartitionedRegionPartialClearException.class);
- });
-
- // Let asyncInvocations finish.
- for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
- asyncInvocation.await();
- }
- }
-
- /**
- * The test does the following (clear coordinator is chosen through parameters):
- * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
- * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
- * participates on the clear operation).
- * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
- * - Clears the Partition Region (at this point the non-coordinator is restarted).
- * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
- */
- @Test
- @Parameters(method = "coordinators")
- @TestCaseName("[{index}] {method}(Coordinator:{0})")
- public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
- TestVM coordinatorVM) throws InterruptedException {
- parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
- server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
-
- // Let all VMs continuously execute putAll/removeAll for 30 seconds.
- final int workMillis = 30000;
- List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
- accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
-
- // Retry the clear operation on the region until success (server2 will go down, but other
- // members will eventually become primary for those buckets previously hosted by server2).
- executeClearWithRetry(getVM(coordinatorVM.vmNumber));
-
- // Wait for member to get back online.
- server2.invoke(() -> {
- cacheRule.createCache();
- initDataStore(RegionShortcut.PARTITION_REDUNDANT);
- await().untilAsserted(
- () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
- });
-
- // Let asyncInvocations finish.
- for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
- asyncInvocation.await();
- }
-
- // Assert Region Buckets are consistent.
- asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
- accessor.invoke(this::assertRegionBucketsConsistency);
- }
-
- /**
- * The test does the following (clear coordinator is chosen through parameters):
- * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
- * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
- * participates on the clear operation).
- * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
- * - Clears the Partition Region (at this point the non-coordinator is restarted).
- * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
- * buckets on the the restarted members are not available).
- */
- @Test
- @Parameters(method = "coordinators")
- @TestCaseName("[{index}] {method}(Coordinator:{0})")
- public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
- TestVM coordinatorVM) throws InterruptedException {
- parametrizedSetup(RegionShortcut.PARTITION);
- server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
-
- final int workMillis = 30000;
- List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
- accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
-
- // Clear the region.
- getVM(coordinatorVM.vmNumber).invoke(() -> {
- assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
- .isInstanceOf(PartitionedRegionPartialClearException.class);
- });
-
- // Let asyncInvocations finish.
- for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
- asyncInvocation.await();
- }
- }
-
private enum TestVM {
ACCESSOR(0), SERVER1(1), SERVER2(2);