You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/02 19:29:02 UTC

[geode] 16/19: GEODE-7670: PR Clear with Concurrent Ops DUnitTests (#4848)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f81724fea2bc28fbdc0503021135082b7a907261
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Wed Jul 22 09:35:59 2020 +0100

    GEODE-7670: PR Clear with Concurrent Ops DUnitTests (#4848)
    
    Added distributed tests to verify that the clear operation on
    Partitioned Regions works as expected when there are other
    concurrent operations happening on the cache (put, putAll, get,
    remove, removeAll, members added and members removed).
---
 ...gionClearWithConcurrentOperationsDUnitTest.java | 747 +++++++++++++++++++++
 1 file changed, 747 insertions(+)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
new file mode 100644
index 0000000..fdb91c7
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.internal.util.ArrayUtils.asList;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.internal.cache.versions.RegionVersionHolder;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.internal.cache.versions.VersionSource;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+/**
+ * Tests to verify that {@link PartitionedRegion#clear()} operation can be executed multiple times
+ * on the same region while other cache operations are being executed concurrently and members are
+ * added or removed.
+ */
+@RunWith(JUnitParamsRunner.class)
+public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements Serializable {
+  private static final Integer BUCKETS = 13;
+  private static final String REGION_NAME = "PartitionedRegion";
+  private static final String TEST_CASE_NAME =
+      "[{index}] {method}(Coordinator:{0}, RegionType:{1})";
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule(3);
+
+  @Rule
+  public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
+
+  private VM server1;
+  private VM server2;
+  private VM accessor;
+
+  private enum TestVM {
+    ACCESSOR(0), SERVER1(1), SERVER2(2);
+
+    final int vmNumber;
+
+    TestVM(int vmNumber) {
+      this.vmNumber = vmNumber;
+    }
+  }
+
+  static RegionShortcut[] regionTypes() {
+    return new RegionShortcut[] {
+        RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT
+    };
+  }
+
+  @SuppressWarnings("unused")
+  static TestVM[] coordinators() {
+    return new TestVM[] {
+        TestVM.SERVER1, TestVM.ACCESSOR
+    };
+  }
+
+  @SuppressWarnings("unused")
+  static Object[] coordinatorsAndRegionTypes() {
+    ArrayList<Object[]> parameters = new ArrayList<>();
+    RegionShortcut[] regionShortcuts = regionTypes();
+
+    Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
+      parameters.add(new Object[] {TestVM.SERVER1, regionShortcut});
+      parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut});
+    });
+
+    return parameters.toArray();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    server1 = getVM(TestVM.SERVER1.vmNumber);
+    server2 = getVM(TestVM.SERVER2.vmNumber);
+    accessor = getVM(TestVM.ACCESSOR.vmNumber);
+  }
+
+  private void initAccessor(RegionShortcut regionShortcut) {
+    PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
+        .setTotalNumBuckets(BUCKETS)
+        .setLocalMaxMemory(0)
+        .create();
+
+    cacheRule.getCache().createRegionFactory(regionShortcut)
+        .setPartitionAttributes(attrs)
+        .create(REGION_NAME);
+
+  }
+
+  private void initDataStore(RegionShortcut regionShortcut) {
+    PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
+        .setTotalNumBuckets(BUCKETS)
+        .create();
+
+    cacheRule.getCache().createRegionFactory(regionShortcut)
+        .setPartitionAttributes(attrs)
+        .create(REGION_NAME);
+  }
+
+  private void parametrizedSetup(RegionShortcut regionShortcut) {
+    server1.invoke(() -> initDataStore(regionShortcut));
+    server2.invoke(() -> initDataStore(regionShortcut));
+    accessor.invoke(() -> initAccessor(regionShortcut));
+  }
+
+  private void waitForSilence() {
+    DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats();
+    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+    PartitionedRegionStats partitionedRegionStats = region.getPrStats();
+
+    await().untilAsserted(() -> {
+      assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
+      assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0);
+    });
+  }
+
+  /**
+   * Populates the region and verifies the data on the selected VMs.
+   */
+  private void populateRegion(VM feeder, int entryCount, List<VM> vms) {
+    feeder.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+      IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
+    });
+
+    vms.forEach(vm -> vm.invoke(() -> {
+      waitForSilence();
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+
+      IntStream.range(0, entryCount)
+          .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i));
+    }));
+  }
+
+  /**
+   * Asserts that the RegionVersionVectors for both buckets are consistent.
+   *
+   * @param bucketId Id of the bucket to compare.
+   * @param bucketDump1 First bucketDump.
+   * @param bucketDump2 Second bucketDump.
+   */
+  private void assertRegionVersionVectorsConsistency(int bucketId, BucketDump bucketDump1,
+      BucketDump bucketDump2) {
+    RegionVersionVector<?> rvv1 = bucketDump1.getRvv();
+    RegionVersionVector<?> rvv2 = bucketDump2.getRvv();
+
+    if (rvv1 == null) {
+      assertThat(rvv2)
+          .as("Bucket " + bucketId + " has an RVV on member " + bucketDump2.getMember()
+              + ", but does not on member " + bucketDump1.getMember())
+          .isNull();
+    }
+
+    if (rvv2 == null) {
+      assertThat(rvv1)
+          .as("Bucket " + bucketId + " has an RVV on member " + bucketDump1.getMember()
+              + ", but does not on member " + bucketDump2.getMember())
+          .isNull();
+    }
+
+    assertThat(rvv1).isNotNull();
+    assertThat(rvv2).isNotNull();
+    Map<VersionSource<?>, RegionVersionHolder<?>> rvv2Members =
+        new HashMap<>(rvv1.getMemberToVersion());
+    Map<VersionSource<?>, RegionVersionHolder<?>> rvv1Members =
+        new HashMap<>(rvv1.getMemberToVersion());
+    for (Map.Entry<VersionSource<?>, RegionVersionHolder<?>> entry : rvv1Members.entrySet()) {
+      VersionSource<?> memberId = entry.getKey();
+      RegionVersionHolder<?> versionHolder1 = entry.getValue();
+      RegionVersionHolder<?> versionHolder2 = rvv2Members.remove(memberId);
+      assertThat(versionHolder1)
+          .as("RegionVersionVector for bucket " + bucketId + " on member " + bucketDump1.getMember()
+              + " is not consistent with member " + bucketDump2.getMember())
+          .isEqualTo(versionHolder2);
+    }
+  }
+
+  /**
+   * Asserts that the region data is consistent across buckets.
+   */
+  private void assertRegionBucketsConsistency() throws ForceReattemptException {
+    List<BucketDump> bucketDumps;
+    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+    // Redundant copies + 1 primary.
+    int expectedCopies = region.getRedundantCopies() + 1;
+
+    for (int bId = 0; bId < BUCKETS; bId++) {
+      final int bucketId = bId;
+      bucketDumps = region.getAllBucketEntries(bucketId);
+      assertThat(bucketDumps.size())
+          .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has "
+              + bucketDumps.size())
+          .isEqualTo(expectedCopies);
+
+      // Check that all copies of the bucket have the same data.
+      if (bucketDumps.size() > 1) {
+        BucketDump firstDump = bucketDumps.get(0);
+
+        for (int j = 1; j < bucketDumps.size(); j++) {
+          BucketDump otherDump = bucketDumps.get(j);
+          assertRegionVersionVectorsConsistency(bucketId, firstDump, otherDump);
+
+          await().untilAsserted(() -> assertThat(otherDump.getValues())
+              .as("Values for bucket " + bucketId + " on member " + otherDump.getMember()
+                  + " are not consistent with member " + firstDump.getMember())
+              .isEqualTo(firstDump.getValues()));
+
+          await().untilAsserted(() -> assertThat(otherDump.getVersions())
+              .as("Versions for bucket " + bucketId + " on member " + otherDump.getMember()
+                  + " are not consistent with member " + firstDump.getMember())
+              .isEqualTo(firstDump.getVersions()));
+        }
+      }
+    }
+  }
+
+  /**
+   * Continuously execute get operations on the PartitionedRegion for the given durationInMillis.
+   */
+  private void executeGets(final int numEntries, final long durationInMillis) {
+    Cache cache = cacheRule.getCache();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+
+    while (Instant.now().isBefore(finishTime)) {
+      // Region might have been cleared in between, that's why we check for null.
+      IntStream.range(0, numEntries).forEach(i -> {
+        Optional<String> nullableValue = Optional.ofNullable(region.get(String.valueOf(i)));
+        nullableValue.ifPresent(value -> assertThat(value).isEqualTo("Value_" + i));
+      });
+    }
+  }
+
+  /**
+   * Continuously execute put operations on the PartitionedRegion for the given durationInMillis.
+   */
+  private void executePuts(final int numEntries, final long durationInMillis) {
+    Cache cache = cacheRule.getCache();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+
+    while (Instant.now().isBefore(finishTime)) {
+      IntStream.range(0, numEntries).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
+    }
+  }
+
+  /**
+   * Continuously execute putAll operations on the PartitionedRegion for the given
+   * durationInMillis.
+   */
+  private void executePutAlls(final int startKey, final int finalKey, final long durationInMillis) {
+    Cache cache = cacheRule.getCache();
+    Map<String, String> valuesToInsert = new HashMap<>();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    IntStream.range(startKey, finalKey)
+        .forEach(i -> valuesToInsert.put(String.valueOf(i), "Value_" + i));
+    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+
+    while (Instant.now().isBefore(finishTime)) {
+      region.putAll(valuesToInsert);
+    }
+  }
+
+  /**
+   * Continuously execute remove operations on the PartitionedRegion for the given
+   * durationInMillis.
+   */
+  private void executeRemoves(final int numEntries, final long durationInMillis) {
+    Cache cache = cacheRule.getCache();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+
+    while (Instant.now().isBefore(finishTime)) {
+      // Region might have been cleared in between, that's why we check for null.
+      IntStream.range(0, numEntries).forEach(i -> {
+        Optional<String> nullableValue = Optional.ofNullable(region.remove(String.valueOf(i)));
+        nullableValue.ifPresent(value -> assertThat(value).isEqualTo("Value_" + i));
+      });
+    }
+  }
+
+  /**
+   * Continuously execute removeAll operations on the PartitionedRegion for the given
+   * durationInMillis.
+   */
+  private void executeRemoveAlls(final int startKey, final int finalKey,
+      final long durationInMillis) {
+    Cache cache = cacheRule.getCache();
+    List<String> keysToRemove = new ArrayList<>();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    IntStream.range(startKey, finalKey).forEach(i -> keysToRemove.add(String.valueOf(i)));
+    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+
+    while (Instant.now().isBefore(finishTime)) {
+      region.removeAll(keysToRemove);
+    }
+  }
+
+  /**
+   * Execute the clear operation and retry until success.
+   */
+  private void executeClearWithRetry(VM coordinator) {
+    coordinator.invoke(() -> {
+      boolean retry;
+
+      do {
+        retry = false;
+
+        try {
+          cacheRule.getCache().getRegion(REGION_NAME).clear();
+        } catch (PartitionedRegionPartialClearException pce) {
+          retry = true;
+        }
+
+      } while (retry);
+    });
+  }
+
+  /**
+   * Continuously execute clear operations on the PartitionedRegion every periodInMillis for the
+   * given
+   * durationInMillis.
+   */
+  private void executeClears(final long durationInMillis, final long periodInMillis)
+      throws InterruptedException {
+    Cache cache = cacheRule.getCache();
+    AtomicLong invocationCount = new AtomicLong(0);
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    Long minimumInvocationCount = (durationInMillis / periodInMillis);
+    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+    ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(() -> {
+      region.clear();
+      invocationCount.incrementAndGet();
+    }, 0, periodInMillis, TimeUnit.MILLISECONDS);
+
+    await().untilAsserted(
+        () -> assertThat(invocationCount.get()).isGreaterThanOrEqualTo(minimumInvocationCount));
+    scheduledFuture.cancel(false);
+    executor.shutdown();
+    executor.awaitTermination(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+  }
+
+  /**
+   * The test does the following (clear coordinator and regionType are parametrized):
+   * - Launches one thread per VM to continuously execute removes, puts and gets for a given time.
+   * - Clears the Partition Region continuously every X milliseconds for a given time.
+   * - Asserts that, after the clears have finished, the Region Buckets are consistent across
+   * members.
+   */
+  @Test
+  @TestCaseName(TEST_CASE_NAME)
+  @Parameters(method = "coordinatorsAndRegionTypes")
+  public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
+      RegionShortcut regionShortcut) throws InterruptedException {
+    final int entries = 15000;
+    final int workMillis = 60000;
+    parametrizedSetup(regionShortcut);
+
+    // Let all VMs continuously execute puts and gets for 60 seconds.
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePuts(entries, workMillis)),
+        server2.invokeAsync(() -> executeGets(entries, workMillis)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+
+    // Clear the region every second for 60 seconds.
+    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000));
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (clear coordinator and regionType are parametrized):
+   * - Launches two threads per VM to continuously execute putAll and removeAll for a given time.
+   * - Clears the Partition Region continuously every X milliseconds for a given time.
+   * - Asserts that, after the clears have finished, the Region Buckets are consistent across
+   * members.
+   */
+  @Test
+  @TestCaseName(TEST_CASE_NAME)
+  @Parameters(method = "coordinatorsAndRegionTypes")
+  public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
+      RegionShortcut regionShortcut) throws InterruptedException {
+    final int workMillis = 15000;
+    parametrizedSetup(regionShortcut);
+
+    // Let all VMs continuously execute putAll and removeAll for 15 seconds.
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
+        server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
+        server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)),
+        server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)),
+        accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)),
+        accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
+
+    // Clear the region every half second for 15 seconds.
+    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500));
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (regionType is parametrized):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop the
+   * coordinator VM while the clear is in progress.
+   * - Clears the Partition Region (at this point the coordinator is restarted).
+   * - Asserts that, after the member joins again, the Region Buckets are consistent.
+   */
+  @Test
+  @TestCaseName("[{index}] {method}(RegionType:{0})")
+  @Parameters(method = "regionTypes")
+  public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
+    final int entries = 1000;
+    parametrizedSetup(regionShortcut);
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+
+    // Set the CoordinatorMemberKiller and try to clear the region
+    server1.invoke(() -> {
+      DistributionMessageObserver.setInstance(new MemberKiller(true));
+      Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+      assertThatThrownBy(region::clear)
+          .isInstanceOf(DistributedSystemDisconnectedException.class)
+          .hasCauseInstanceOf(ForcedDisconnectException.class);
+    });
+
+    // Wait for member to get back online and assign all buckets.
+    server1.invoke(() -> {
+      cacheRule.createCache();
+      initDataStore(regionShortcut);
+      await().untilAsserted(
+          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+      PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
+    });
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (clear coordinator is chosen through parameters):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+   * participates on the clear operation).
+   * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is restarted).
+   * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
+   */
+  @Test
+  @Parameters(method = "coordinators")
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    final int entries = 7500;
+    final int workMillis = 30000;
+    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+
+    // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executeGets(entries, workMillis)),
+        server1.invokeAsync(() -> executePuts(entries, workMillis)),
+        accessor.invokeAsync(() -> executeGets(entries, workMillis)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+
+    // Retry the clear operation on the region until success (server2 will go down, but other
+    // members will eventually become primary for those buckets previously hosted by server2).
+    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+
+    // Wait for member to get back online.
+    server2.invoke(() -> {
+      cacheRule.createCache();
+      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+      await().untilAsserted(
+          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (clear coordinator is chosen through parameters):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+   * participates on the clear operation).
+   * - Launches two threads per VM to continuously execute gets, puts and removes for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is restarted).
+   * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
+   * buckets on the the restarted members are not available).
+   */
+  @Test
+  @Parameters(method = "coordinators")
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    final int entries = 7500;
+    final int workMillis = 30000;
+    parametrizedSetup(RegionShortcut.PARTITION);
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+
+    // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executeGets(entries, workMillis)),
+        server1.invokeAsync(() -> executePuts(entries, workMillis)),
+        accessor.invokeAsync(() -> executeGets(entries, workMillis)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+
+    // Clear the region.
+    getVM(coordinatorVM.vmNumber).invoke(() -> {
+      assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
+          .isInstanceOf(PartitionedRegionPartialClearException.class);
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+  }
+
+  /**
+   * The test does the following (clear coordinator is chosen through parameters):
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+   * participates on the clear operation).
+   * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is restarted).
+   * - Asserts that, after the clear has finished, the Region Buckets are consistent across members.
+   */
+  @Test
+  @Parameters(method = "coordinators")
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    final int workMillis = 30000;
+    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+
+    // Let all VMs continuously execute putAll/removeAll for 30 seconds.
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
+        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+
+    // Retry the clear operation on the region until success (server2 will go down, but other
+    // members will eventually become primary for those buckets previously hosted by server2).
+    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+
+    // Wait for member to get back online.
+    server2.invoke(() -> {
+      cacheRule.createCache();
+      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+      await().untilAsserted(
+          () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (clear coordinator is chosen through parameters):
+   * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a
+   * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so
+   * participates on the clear operation).
+   * - Launches one thread per VM to continuously execute putAll/removeAll for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is restarted).
+   * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary
+   * buckets on the the restarted members are not available).
+   */
+  @Test
+  @Parameters(method = "coordinators")
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    final int workMillis = 30000;
+    parametrizedSetup(RegionShortcut.PARTITION);
+    server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
+        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+
+    // Clear the region.
+    getVM(coordinatorVM.vmNumber).invoke(() -> {
+      assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
+          .isInstanceOf(PartitionedRegionPartialClearException.class);
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+  }
+
+  /**
+   * Shutdowns a coordinator member while the clear operation is in progress.
+   */
+  public static class MemberKiller extends DistributionMessageObserver {
+    private final boolean coordinator;
+
+    public MemberKiller(boolean coordinator) {
+      this.coordinator = coordinator;
+    }
+
+    /**
+     * Shutdowns the VM whenever the message is an instance of
+     * {@link PartitionedRegionClearMessage}.
+     */
+    private void shutdownMember(DistributionMessage message) {
+      if (message instanceof PartitionedRegionClearMessage) {
+        if (((PartitionedRegionClearMessage) message)
+            .getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+          DistributionMessageObserver.setInstance(null);
+          InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+          MembershipManagerHelper
+              .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance());
+          await().untilAsserted(
+              () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+        }
+      }
+    }
+
+    /**
+     * Invoked only on clear coordinator VM.
+     *
+     * @param dm the distribution manager that received the message
+     * @param message The message itself
+     */
+    @Override
+    public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) {
+      if (coordinator) {
+        shutdownMember(message);
+      } else {
+        super.beforeSendMessage(dm, message);
+      }
+    }
+
+    /**
+     * Invoked only on non clear coordinator VM.
+     *
+     * @param dm the distribution manager that received the message
+     * @param message The message itself
+     */
+    @Override
+    public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+      if (!coordinator) {
+        shutdownMember(message);
+      } else {
+        super.beforeProcessMessage(dm, message);
+      }
+    }
+  }
+}