You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:50 UTC

[geode] 12/17: GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 2

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 5e007a606b8377ebcbe45e3a2a512470aaa0a563
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Apr 19 16:16:17 2021 -0700

    GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 2
---
 ...gionClearWithConcurrentOperationsDUnitTest.java | 782 ++++++++++-----------
 1 file changed, 378 insertions(+), 404 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index c9a1e5b..b2aacc0 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -72,8 +72,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
 
   private static final int BUCKETS = 13;
   private static final String REGION_NAME = "PartitionedRegion";
-  private static final String TEST_CASE_NAME =
-      "[{index}] {method}(Coordinator:{0}, RegionType:{1})";
 
   @Rule
   public DistributedRule distributedRule = new DistributedRule(3);
@@ -84,32 +82,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   private VM server2;
   private VM accessor;
 
-  @SuppressWarnings("unused")
-  static TestVM[] coordinators() {
-    return new TestVM[] {
-        TestVM.SERVER1, TestVM.ACCESSOR
-    };
-  }
-
-  @SuppressWarnings("unused")
-  static Object[] coordinatorsAndRegionTypes() {
-    List<Object[]> parameters = new ArrayList<>();
-    RegionShortcut[] regionShortcuts = regionTypes();
-
-    Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
-      parameters.add(new Object[] {TestVM.SERVER1, regionShortcut});
-      parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut});
-    });
-
-    return parameters.toArray();
-  }
-
-  private static RegionShortcut[] regionTypes() {
-    return new RegionShortcut[] {
-        RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT
-    };
-  }
-
   @Before
   public void setUp() throws Exception {
     server1 = getVM(TestVM.SERVER1.vmNumber);
@@ -117,127 +89,414 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     accessor = getVM(TestVM.ACCESSOR.vmNumber);
   }
 
-  private void initAccessor(RegionShortcut regionShortcut) {
-    PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
-        .setTotalNumBuckets(BUCKETS)
-        .setLocalMaxMemory(0)
-        .create();
+  /**
+   * The test does the following (clear coordinator and regionType are parametrized):
+   * - Launches one thread per VM to continuously execute removes, puts and gets for a given time.
+   * - Clears the Partition Region continuously every X milliseconds for a given time.
+   * - Asserts that, after the clears have finished, the Region Buckets are consistent across
+   * members.
+   */
+  @Test
+  @Parameters({"SERVER1,PARTITION", "ACCESSOR,PARTITION",
+      "SERVER1,PARTITION_REDUNDANT", "ACCESSOR,PARTITION_REDUNDANT"})
+  @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
+  public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
+      RegionShortcut regionShortcut) throws InterruptedException {
+    parametrizedSetup(regionShortcut);
 
-    cacheRule.getCache().createRegionFactory(regionShortcut)
-        .setPartitionAttributes(attrs)
-        .create(REGION_NAME);
+    // Let all VMs continuously execute puts and gets for 60 seconds.
+    final int workMillis = 60000;
+    final int entries = 15000;
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePuts(entries, workMillis)),
+        server2.invokeAsync(() -> executeGets(entries, workMillis)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
 
-  }
+    // Clear the region every second for 60 seconds.
+    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000));
 
-  private void initDataStore(RegionShortcut regionShortcut) {
-    PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
-        .setTotalNumBuckets(BUCKETS)
-        .create();
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
 
-    cacheRule.getCache().createRegionFactory(regionShortcut)
-        .setPartitionAttributes(attrs)
-        .create(REGION_NAME);
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
   }
 
-  private void parametrizedSetup(RegionShortcut regionShortcut) {
-    server1.invoke(() -> initDataStore(regionShortcut));
-    server2.invoke(() -> initDataStore(regionShortcut));
-    accessor.invoke(() -> initAccessor(regionShortcut));
-  }
+  /**
+   * The test does the following (clear coordinator and regionType are parametrized):
+   * - Launches two threads per VM to continuously execute putAll and removeAll for a given time.
+   * - Clears the Partition Region continuously every X milliseconds for a given time.
+   * - Asserts that, after the clears have finished, the Region Buckets are consistent across
+   * members.
+   */
+  @Test
+  @Parameters({"SERVER1,PARTITION", "ACCESSOR,PARTITION",
+      "SERVER1,PARTITION_REDUNDANT", "ACCESSOR,PARTITION_REDUNDANT"})
+  @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
+  public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
+      RegionShortcut regionShortcut) throws InterruptedException {
+    parametrizedSetup(regionShortcut);
 
-  private void waitForSilence() {
-    DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats();
-    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
-    PartitionedRegionStats partitionedRegionStats = region.getPrStats();
+    // Let all VMs continuously execute putAll and removeAll for 15 seconds.
+    final int workMillis = 15000;
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
+        server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
+        server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)),
+        server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)),
+        accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)),
+        accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
 
-    await().untilAsserted(() -> {
-      assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0);
-      assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0);
-      assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0);
-      assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0);
-      assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
-      assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0);
-    });
+    // Clear the region every half second for 15 seconds.
+    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500));
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
   }
 
   /**
-   * Populates the region and verifies the data on the selected VMs.
+   * The test does the following (regionType is parametrized):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop the
+   * coordinator VM while the clear is in progress.
+   * - Clears the Partition Region (at this point the coordinator is restarted).
+   * - Asserts that, after the member joins again, the Region Buckets are consistent.
    */
-  private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) {
-    feeder.invoke(() -> {
+  @Test
+  @Parameters({"PARTITION", "PARTITION_REDUNDANT"})
+  @TestCaseName("[{index}] {method}(RegionType:{0})")
+  public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
+    parametrizedSetup(regionShortcut);
+    final int entries = 1000;
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+
+    // Set the CoordinatorMemberKiller and try to clear the region
+    server1.invoke(() -> {
+      DistributionMessageObserver.setInstance(new MemberKiller(true));
       Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
-      IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
+      assertThatThrownBy(region::clear)
+          .isInstanceOf(DistributedSystemDisconnectedException.class)
+          .hasCauseInstanceOf(ForcedDisconnectException.class);
     });
 
-    vms.forEach(vm -> vm.invoke(() -> {
-      waitForSilence();
-      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+    // Wait for member to get back online and assign all buckets.
+    server1.invoke(() -> {
+      cacheRule.createCache();
+      initDataStore(regionShortcut);
+      await().untilAsserted(
+          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+      PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
+    });
 
-      IntStream.range(0, entryCount)
-          .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i));
-    }));
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
   }
 
   /**
-   * Asserts that the RegionVersionVectors for both buckets are consistent.
-   *
-   * @param bucketId Id of the bucket to compare.
-   * @param bucketDump1 First bucketDump.
-   * @param bucketDump2 Second bucketDump.
+   * The test does the following (clear coordinator is chosen through parameters):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+   * participates on the clear operation).
+   * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is restarted).
+   * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
    */
-  private void assertRegionVersionVectorsConsistency(int bucketId, BucketDump bucketDump1,
-      BucketDump bucketDump2) {
-    RegionVersionVector<?> rvv1 = bucketDump1.getRvv();
-    RegionVersionVector<?> rvv2 = bucketDump2.getRvv();
+  @Test
+  @Parameters({"SERVER1", "ACCESSOR"})
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+    final int entries = 7500;
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
-    if (rvv1 == null) {
-      assertThat(rvv2)
-          .as("Bucket " + bucketId + " has an RVV on member " + bucketDump2.getMember()
-              + ", but does not on member " + bucketDump1.getMember())
-          .isNull();
-    }
+    // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+    final int workMillis = 30000;
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executeGets(entries, workMillis)),
+        server1.invokeAsync(() -> executePuts(entries, workMillis)),
+        accessor.invokeAsync(() -> executeGets(entries, workMillis)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
 
-    if (rvv2 == null) {
-      assertThat(rvv1)
-          .as("Bucket " + bucketId + " has an RVV on member " + bucketDump1.getMember()
-              + ", but does not on member " + bucketDump2.getMember())
-          .isNull();
-    }
+    // Retry the clear operation on the region until success (server2 will go down, but other
+    // members will eventually become primary for those buckets previously hosted by server2).
+    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
 
-    assertThat(rvv1).isNotNull();
-    assertThat(rvv2).isNotNull();
-    Map<VersionSource<?>, RegionVersionHolder<?>> rvv2Members =
-        new HashMap<>(rvv1.getMemberToVersion());
-    Map<VersionSource<?>, RegionVersionHolder<?>> rvv1Members =
-        new HashMap<>(rvv1.getMemberToVersion());
-    for (Map.Entry<VersionSource<?>, RegionVersionHolder<?>> entry : rvv1Members.entrySet()) {
-      VersionSource<?> memberId = entry.getKey();
-      RegionVersionHolder<?> versionHolder1 = entry.getValue();
-      RegionVersionHolder<?> versionHolder2 = rvv2Members.remove(memberId);
-      assertThat(versionHolder1)
-          .as("RegionVersionVector for bucket " + bucketId + " on member " + bucketDump1.getMember()
-              + " is not consistent with member " + bucketDump2.getMember())
-          .isEqualTo(versionHolder2);
+    // Wait for member to get back online.
+    server2.invoke(() -> {
+      cacheRule.createCache();
+      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+      await().untilAsserted(
+          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
     }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
   }
 
   /**
-   * Asserts that the region data is consistent across buckets.
+   * The test does the following (clear coordinator is chosen through parameters):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+   * participates on the clear operation).
+   * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is restarted).
+   * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
+   * buckets on the the restarted members are not available).
    */
-  private void assertRegionBucketsConsistency() throws ForceReattemptException {
-    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
-    // Redundant copies + 1 primary.
-    int expectedCopies = region.getRedundantCopies() + 1;
+  @Test
+  @Parameters({"SERVER1", "ACCESSOR"})
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    parametrizedSetup(RegionShortcut.PARTITION);
+    final int entries = 7500;
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
-    for (int bId = 0; bId < BUCKETS; bId++) {
-      final int bucketId = bId;
-      List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId);
-      assertThat(bucketDumps.size())
-          .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has "
-              + bucketDumps.size())
-          .isEqualTo(expectedCopies);
+    // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+    final int workMillis = 30000;
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executeGets(entries, workMillis)),
+        server1.invokeAsync(() -> executePuts(entries, workMillis)),
+        accessor.invokeAsync(() -> executeGets(entries, workMillis)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
 
-      // Check that all copies of the bucket have the same data.
+    // Clear the region.
+    getVM(coordinatorVM.vmNumber).invoke(() -> {
+      assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
+          .isInstanceOf(PartitionedRegionPartialClearException.class);
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+  }
+
+  /**
+   * The test does the following (clear coordinator is chosen through parameters):
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+   * participates on the clear operation).
+   * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is restarted).
+   * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
+   */
+  @Test
+  @Parameters({"SERVER1", "ACCESSOR"})
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+
+    // Let all VMs continuously execute putAll/removeAll for 30 seconds.
+    final int workMillis = 30000;
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
+        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+
+    // Retry the clear operation on the region until success (server2 will go down, but other
+    // members will eventually become primary for those buckets previously hosted by server2).
+    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+
+    // Wait for member to get back online.
+    server2.invoke(() -> {
+      cacheRule.createCache();
+      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+      await().untilAsserted(
+          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (clear coordinator is chosen through parameters):
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+   * participates on the clear operation).
+   * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is restarted).
+   * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
+   * buckets on the the restarted members are not available).
+   */
+  @Test
+  @Parameters({"SERVER1", "ACCESSOR"})
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    parametrizedSetup(RegionShortcut.PARTITION);
+    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+
+    final int workMillis = 30000;
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
+        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+
+    // Clear the region.
+    getVM(coordinatorVM.vmNumber).invoke(() -> {
+      assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
+          .isInstanceOf(PartitionedRegionPartialClearException.class);
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+  }
+
+  private void initAccessor(RegionShortcut regionShortcut) {
+    PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
+        .setTotalNumBuckets(BUCKETS)
+        .setLocalMaxMemory(0)
+        .create();
+
+    cacheRule.getCache().createRegionFactory(regionShortcut)
+        .setPartitionAttributes(attrs)
+        .create(REGION_NAME);
+
+  }
+
+  private void initDataStore(RegionShortcut regionShortcut) {
+    PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
+        .setTotalNumBuckets(BUCKETS)
+        .create();
+
+    cacheRule.getCache().createRegionFactory(regionShortcut)
+        .setPartitionAttributes(attrs)
+        .create(REGION_NAME);
+  }
+
+  private void parametrizedSetup(RegionShortcut regionShortcut) {
+    server1.invoke(() -> initDataStore(regionShortcut));
+    server2.invoke(() -> initDataStore(regionShortcut));
+    accessor.invoke(() -> initAccessor(regionShortcut));
+  }
+
+  private void waitForSilence() {
+    DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats();
+    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+    PartitionedRegionStats partitionedRegionStats = region.getPrStats();
+
+    await().untilAsserted(() -> {
+      assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0);
+    });
+  }
+
+  /**
+   * Populates the region and verifies the data on the selected VMs.
+   */
+  private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) {
+    feeder.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+      IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
+    });
+
+    vms.forEach(vm -> vm.invoke(() -> {
+      waitForSilence();
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+
+      IntStream.range(0, entryCount)
+          .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i));
+    }));
+  }
+
+  /**
+   * Asserts that the RegionVersionVectors for both buckets are consistent.
+   *
+   * @param bucketId Id of the bucket to compare.
+   * @param bucketDump1 First bucketDump.
+   * @param bucketDump2 Second bucketDump.
+   */
+  private void assertRegionVersionVectorsConsistency(int bucketId, BucketDump bucketDump1,
+      BucketDump bucketDump2) {
+    RegionVersionVector<?> rvv1 = bucketDump1.getRvv();
+    RegionVersionVector<?> rvv2 = bucketDump2.getRvv();
+
+    if (rvv1 == null) {
+      assertThat(rvv2)
+          .as("Bucket " + bucketId + " has an RVV on member " + bucketDump2.getMember()
+              + ", but does not on member " + bucketDump1.getMember())
+          .isNull();
+    }
+
+    if (rvv2 == null) {
+      assertThat(rvv1)
+          .as("Bucket " + bucketId + " has an RVV on member " + bucketDump1.getMember()
+              + ", but does not on member " + bucketDump2.getMember())
+          .isNull();
+    }
+
+    assertThat(rvv1).isNotNull();
+    assertThat(rvv2).isNotNull();
+    Map<VersionSource<?>, RegionVersionHolder<?>> rvv2Members =
+        new HashMap<>(rvv1.getMemberToVersion());
+    Map<VersionSource<?>, RegionVersionHolder<?>> rvv1Members =
+        new HashMap<>(rvv1.getMemberToVersion());
+    for (Map.Entry<VersionSource<?>, RegionVersionHolder<?>> entry : rvv1Members.entrySet()) {
+      VersionSource<?> memberId = entry.getKey();
+      RegionVersionHolder<?> versionHolder1 = entry.getValue();
+      RegionVersionHolder<?> versionHolder2 = rvv2Members.remove(memberId);
+      assertThat(versionHolder1)
+          .as("RegionVersionVector for bucket " + bucketId + " on member " + bucketDump1.getMember()
+              + " is not consistent with member " + bucketDump2.getMember())
+          .isEqualTo(versionHolder2);
+    }
+  }
+
+  /**
+   * Asserts that the region data is consistent across buckets.
+   */
+  private void assertRegionBucketsConsistency() throws ForceReattemptException {
+    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+    // Redundant copies + 1 primary.
+    int expectedCopies = region.getRedundantCopies() + 1;
+
+    for (int bId = 0; bId < BUCKETS; bId++) {
+      final int bucketId = bId;
+      List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId);
+      assertThat(bucketDumps.size())
+          .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has "
+              + bucketDumps.size())
+          .isEqualTo(expectedCopies);
+
+      // Check that all copies of the bucket have the same data.
       if (bucketDumps.size() > 1) {
         BucketDump firstDump = bucketDumps.get(0);
 
@@ -375,291 +634,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     }
   }
 
-  /**
-   * The test does the following (clear coordinator and regionType are parametrized):
-   * - Launches one thread per VM to continuously execute removes, puts and gets for a given time.
-   * - Clears the Partition Region continuously every X milliseconds for a given time.
-   * - Asserts that, after the clears have finished, the Region Buckets are consistent across
-   * members.
-   */
-  @Test
-  @TestCaseName(TEST_CASE_NAME)
-  @Parameters(method = "coordinatorsAndRegionTypes")
-  public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
-      RegionShortcut regionShortcut) throws InterruptedException {
-    parametrizedSetup(regionShortcut);
-
-    // Let all VMs continuously execute puts and gets for 60 seconds.
-    final int workMillis = 60000;
-    final int entries = 15000;
-    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executePuts(entries, workMillis)),
-        server2.invokeAsync(() -> executeGets(entries, workMillis)),
-        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
-
-    // Clear the region every second for 60 seconds.
-    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000));
-
-    // Let asyncInvocations finish.
-    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
-      asyncInvocation.await();
-    }
-
-    // Assert Region Buckets are consistent.
-    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
-    accessor.invoke(this::assertRegionBucketsConsistency);
-  }
-
-  /**
-   * The test does the following (clear coordinator and regionType are parametrized):
-   * - Launches two threads per VM to continuously execute putAll and removeAll for a given time.
-   * - Clears the Partition Region continuously every X milliseconds for a given time.
-   * - Asserts that, after the clears have finished, the Region Buckets are consistent across
-   * members.
-   */
-  @Test
-  @TestCaseName(TEST_CASE_NAME)
-  @Parameters(method = "coordinatorsAndRegionTypes")
-  public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
-      RegionShortcut regionShortcut) throws InterruptedException {
-    parametrizedSetup(regionShortcut);
-
-    // Let all VMs continuously execute putAll and removeAll for 15 seconds.
-    final int workMillis = 15000;
-    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
-        server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
-        server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)),
-        server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)),
-        accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)),
-        accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
-
-    // Clear the region every half second for 15 seconds.
-    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500));
-
-    // Let asyncInvocations finish.
-    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
-      asyncInvocation.await();
-    }
-
-    // Assert Region Buckets are consistent.
-    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
-    accessor.invoke(this::assertRegionBucketsConsistency);
-  }
-
-  /**
-   * The test does the following (regionType is parametrized):
-   * - Populates the Partition Region.
-   * - Verifies that the entries are synchronized on all members.
-   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop the
-   * coordinator VM while the clear is in progress.
-   * - Clears the Partition Region (at this point the coordinator is restarted).
-   * - Asserts that, after the member joins again, the Region Buckets are consistent.
-   */
-  @Test
-  @TestCaseName("[{index}] {method}(RegionType:{0})")
-  @Parameters(method = "regionTypes")
-  public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
-    parametrizedSetup(regionShortcut);
-    final int entries = 1000;
-    populateRegion(accessor, entries, asList(accessor, server1, server2));
-
-    // Set the CoordinatorMemberKiller and try to clear the region
-    server1.invoke(() -> {
-      DistributionMessageObserver.setInstance(new MemberKiller(true));
-      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
-      assertThatThrownBy(region::clear)
-          .isInstanceOf(DistributedSystemDisconnectedException.class)
-          .hasCauseInstanceOf(ForcedDisconnectException.class);
-    });
-
-    // Wait for member to get back online and assign all buckets.
-    server1.invoke(() -> {
-      cacheRule.createCache();
-      initDataStore(regionShortcut);
-      await().untilAsserted(
-          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
-      PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
-    });
-
-    // Assert Region Buckets are consistent.
-    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
-    accessor.invoke(this::assertRegionBucketsConsistency);
-  }
-
-  /**
-   * The test does the following (clear coordinator is chosen through parameters):
-   * - Populates the Partition Region.
-   * - Verifies that the entries are synchronized on all members.
-   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
-   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
-   * participates on the clear operation).
-   * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
-   * - Clears the Partition Region (at this point the non-coordinator is restarted).
-   * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
-   */
-  @Test
-  @Parameters(method = "coordinators")
-  @TestCaseName("[{index}] {method}(Coordinator:{0})")
-  public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
-      TestVM coordinatorVM) throws InterruptedException {
-    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
-    final int entries = 7500;
-    populateRegion(accessor, entries, asList(accessor, server1, server2));
-    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
-
-    // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
-    final int workMillis = 30000;
-    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executeGets(entries, workMillis)),
-        server1.invokeAsync(() -> executePuts(entries, workMillis)),
-        accessor.invokeAsync(() -> executeGets(entries, workMillis)),
-        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
-
-    // Retry the clear operation on the region until success (server2 will go down, but other
-    // members will eventually become primary for those buckets previously hosted by server2).
-    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
-
-    // Wait for member to get back online.
-    server2.invoke(() -> {
-      cacheRule.createCache();
-      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
-      await().untilAsserted(
-          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
-    });
-
-    // Let asyncInvocations finish.
-    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
-      asyncInvocation.await();
-    }
-
-    // Assert Region Buckets are consistent.
-    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
-    accessor.invoke(this::assertRegionBucketsConsistency);
-  }
-
-  /**
-   * The test does the following (clear coordinator is chosen through parameters):
-   * - Populates the Partition Region.
-   * - Verifies that the entries are synchronized on all members.
-   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
-   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
-   * participates on the clear operation).
-   * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
-   * - Clears the Partition Region (at this point the non-coordinator is restarted).
-   * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
-   * buckets on the the restarted members are not available).
-   */
-  @Test
-  @Parameters(method = "coordinators")
-  @TestCaseName("[{index}] {method}(Coordinator:{0})")
-  public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
-      TestVM coordinatorVM) throws InterruptedException {
-    parametrizedSetup(RegionShortcut.PARTITION);
-    final int entries = 7500;
-    populateRegion(accessor, entries, asList(accessor, server1, server2));
-    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
-
-    // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
-    final int workMillis = 30000;
-    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executeGets(entries, workMillis)),
-        server1.invokeAsync(() -> executePuts(entries, workMillis)),
-        accessor.invokeAsync(() -> executeGets(entries, workMillis)),
-        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
-
-    // Clear the region.
-    getVM(coordinatorVM.vmNumber).invoke(() -> {
-      assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
-          .isInstanceOf(PartitionedRegionPartialClearException.class);
-    });
-
-    // Let asyncInvocations finish.
-    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
-      asyncInvocation.await();
-    }
-  }
-
-  /**
-   * The test does the following (clear coordinator is chosen through parameters):
-   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
-   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
-   * participates on the clear operation).
-   * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
-   * - Clears the Partition Region (at this point the non-coordinator is restarted).
-   * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
-   */
-  @Test
-  @Parameters(method = "coordinators")
-  @TestCaseName("[{index}] {method}(Coordinator:{0})")
-  public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
-      TestVM coordinatorVM) throws InterruptedException {
-    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
-    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
-
-    // Let all VMs continuously execute putAll/removeAll for 30 seconds.
-    final int workMillis = 30000;
-    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
-        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
-
-    // Retry the clear operation on the region until success (server2 will go down, but other
-    // members will eventually become primary for those buckets previously hosted by server2).
-    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
-
-    // Wait for member to get back online.
-    server2.invoke(() -> {
-      cacheRule.createCache();
-      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
-      await().untilAsserted(
-          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
-    });
-
-    // Let asyncInvocations finish.
-    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
-      asyncInvocation.await();
-    }
-
-    // Assert Region Buckets are consistent.
-    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
-    accessor.invoke(this::assertRegionBucketsConsistency);
-  }
-
-  /**
-   * The test does the following (clear coordinator is chosen through parameters):
-   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
-   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
-   * participates on the clear operation).
-   * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
-   * - Clears the Partition Region (at this point the non-coordinator is restarted).
-   * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
-   * buckets on the the restarted members are not available).
-   */
-  @Test
-  @Parameters(method = "coordinators")
-  @TestCaseName("[{index}] {method}(Coordinator:{0})")
-  public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
-      TestVM coordinatorVM) throws InterruptedException {
-    parametrizedSetup(RegionShortcut.PARTITION);
-    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
-
-    final int workMillis = 30000;
-    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
-        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
-
-    // Clear the region.
-    getVM(coordinatorVM.vmNumber).invoke(() -> {
-      assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
-          .isInstanceOf(PartitionedRegionPartialClearException.class);
-    });
-
-    // Let asyncInvocations finish.
-    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
-      asyncInvocation.await();
-    }
-  }
-
   private enum TestVM {
     ACCESSOR(0), SERVER1(1), SERVER2(2);