You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:49 UTC

[geode] 11/17: GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 1

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 41eba463fb32d8ecd227c264854c5788190aab60
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Apr 19 14:38:02 2021 -0700

    GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 1
---
 ...gionClearWithConcurrentOperationsDUnitTest.java | 97 +++++++++-------------
 1 file changed, 41 insertions(+), 56 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index 77537cb..c9a1e5b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -28,11 +28,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
 import junitparams.JUnitParamsRunner;
@@ -61,7 +56,6 @@ import org.apache.geode.distributed.internal.membership.api.MembershipManagerHel
 import org.apache.geode.internal.cache.versions.RegionVersionHolder;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.CacheRule;
@@ -73,15 +67,16 @@ import org.apache.geode.test.dunit.rules.DistributedRule;
  * added or removed.
  */
 @RunWith(JUnitParamsRunner.class)
+@SuppressWarnings("serial")
 public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements Serializable {
-  private static final Integer BUCKETS = 13;
+
+  private static final int BUCKETS = 13;
   private static final String REGION_NAME = "PartitionedRegion";
   private static final String TEST_CASE_NAME =
       "[{index}] {method}(Coordinator:{0}, RegionType:{1})";
 
   @Rule
   public DistributedRule distributedRule = new DistributedRule(3);
-
   @Rule
   public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
 
@@ -89,22 +84,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   private VM server2;
   private VM accessor;
 
-  private enum TestVM {
-    ACCESSOR(0), SERVER1(1), SERVER2(2);
-
-    final int vmNumber;
-
-    TestVM(int vmNumber) {
-      this.vmNumber = vmNumber;
-    }
-  }
-
-  static RegionShortcut[] regionTypes() {
-    return new RegionShortcut[] {
-        RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT
-    };
-  }
-
   @SuppressWarnings("unused")
   static TestVM[] coordinators() {
     return new TestVM[] {
@@ -114,7 +93,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
 
   @SuppressWarnings("unused")
   static Object[] coordinatorsAndRegionTypes() {
-    ArrayList<Object[]> parameters = new ArrayList<>();
+    List<Object[]> parameters = new ArrayList<>();
     RegionShortcut[] regionShortcuts = regionTypes();
 
     Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
@@ -125,6 +104,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     return parameters.toArray();
   }
 
+  private static RegionShortcut[] regionTypes() {
+    return new RegionShortcut[] {
+        RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT
+    };
+  }
+
   @Before
   public void setUp() throws Exception {
     server1 = getVM(TestVM.SERVER1.vmNumber);
@@ -178,7 +163,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   /**
    * Populates the region and verifies the data on the selected VMs.
    */
-  private void populateRegion(VM feeder, int entryCount, List<VM> vms) {
+  private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) {
     feeder.invoke(() -> {
       Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
       IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
@@ -240,14 +225,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
    * Asserts that the region data is consistent across buckets.
    */
   private void assertRegionBucketsConsistency() throws ForceReattemptException {
-    List<BucketDump> bucketDumps;
     PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
     // Redundant copies + 1 primary.
     int expectedCopies = region.getRedundantCopies() + 1;
 
     for (int bId = 0; bId < BUCKETS; bId++) {
       final int bucketId = bId;
-      bucketDumps = region.getAllBucketEntries(bucketId);
+      List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId);
       assertThat(bucketDumps.size())
           .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has "
               + bucketDumps.size())
@@ -379,26 +363,16 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
 
   /**
    * Continuously execute clear operations on the PartitionedRegion every periodInMillis for the
-   * given
-   * durationInMillis.
+   * given durationInMillis.
    */
-  private void executeClears(final long durationInMillis, final long periodInMillis)
-      throws InterruptedException {
+  private void executeClears(final long durationInMillis, final long periodInMillis) {
     Cache cache = cacheRule.getCache();
-    AtomicLong invocationCount = new AtomicLong(0);
     Region<String, String> region = cache.getRegion(REGION_NAME);
-    Long minimumInvocationCount = (durationInMillis / periodInMillis);
-    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-    ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(() -> {
+    long minimumInvocationCount = durationInMillis / periodInMillis;
+
+    for (int invocationCount = 0; invocationCount < minimumInvocationCount; invocationCount++) {
       region.clear();
-      invocationCount.incrementAndGet();
-    }, 0, periodInMillis, TimeUnit.MILLISECONDS);
-
-    await().untilAsserted(
-        () -> assertThat(invocationCount.get()).isGreaterThanOrEqualTo(minimumInvocationCount));
-    scheduledFuture.cancel(false);
-    executor.shutdown();
-    executor.awaitTermination(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+    }
   }
 
   /**
@@ -413,11 +387,11 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @Parameters(method = "coordinatorsAndRegionTypes")
   public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
       RegionShortcut regionShortcut) throws InterruptedException {
-    final int entries = 15000;
-    final int workMillis = 60000;
     parametrizedSetup(regionShortcut);
 
     // Let all VMs continuously execute puts and gets for 60 seconds.
+    final int workMillis = 60000;
+    final int entries = 15000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
         server1.invokeAsync(() -> executePuts(entries, workMillis)),
         server2.invokeAsync(() -> executeGets(entries, workMillis)),
@@ -448,10 +422,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @Parameters(method = "coordinatorsAndRegionTypes")
   public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
       RegionShortcut regionShortcut) throws InterruptedException {
-    final int workMillis = 15000;
     parametrizedSetup(regionShortcut);
 
     // Let all VMs continuously execute putAll and removeAll for 15 seconds.
+    final int workMillis = 15000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
         server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
         server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
@@ -486,8 +460,8 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(RegionType:{0})")
   @Parameters(method = "regionTypes")
   public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
-    final int entries = 1000;
     parametrizedSetup(regionShortcut);
+    final int entries = 1000;
     populateRegion(accessor, entries, asList(accessor, server1, server2));
 
     // Set the CoordinatorMemberKiller and try to clear the region
@@ -529,13 +503,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0})")
   public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
       TestVM coordinatorVM) throws InterruptedException {
-    final int entries = 7500;
-    final int workMillis = 30000;
     parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+    final int entries = 7500;
     populateRegion(accessor, entries, asList(accessor, server1, server2));
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
     // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+    final int workMillis = 30000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
         server1.invokeAsync(() -> executeGets(entries, workMillis)),
         server1.invokeAsync(() -> executePuts(entries, workMillis)),
@@ -581,13 +555,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0})")
   public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
       TestVM coordinatorVM) throws InterruptedException {
-    final int entries = 7500;
-    final int workMillis = 30000;
     parametrizedSetup(RegionShortcut.PARTITION);
+    final int entries = 7500;
     populateRegion(accessor, entries, asList(accessor, server1, server2));
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
     // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+    final int workMillis = 30000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
         server1.invokeAsync(() -> executeGets(entries, workMillis)),
         server1.invokeAsync(() -> executePuts(entries, workMillis)),
@@ -620,11 +594,11 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0})")
   public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
       TestVM coordinatorVM) throws InterruptedException {
-    final int workMillis = 30000;
     parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
     // Let all VMs continuously execute putAll/removeAll for 30 seconds.
+    final int workMillis = 30000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
         server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
         accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
@@ -666,10 +640,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0})")
   public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
       TestVM coordinatorVM) throws InterruptedException {
-    final int workMillis = 30000;
     parametrizedSetup(RegionShortcut.PARTITION);
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
+    final int workMillis = 30000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
         server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
         accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
@@ -686,13 +660,24 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     }
   }
 
+  private enum TestVM {
+    ACCESSOR(0), SERVER1(1), SERVER2(2);
+
+    final int vmNumber;
+
+    TestVM(int vmNumber) {
+      this.vmNumber = vmNumber;
+    }
+  }
+
   /**
    * Shutdowns a coordinator member while the clear operation is in progress.
    */
-  public static class MemberKiller extends DistributionMessageObserver {
+  private static class MemberKiller extends DistributionMessageObserver {
+
     private final boolean coordinator;
 
-    public MemberKiller(boolean coordinator) {
+    private MemberKiller(boolean coordinator) {
       this.coordinator = coordinator;
     }