You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:51 UTC

[geode] 13/17: GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 3

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 1466a44cecebe10ec050fba8d85122445e3d4508
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Apr 19 16:42:22 2021 -0700

    GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 3
---
 ...gionClearWithConcurrentOperationsDUnitTest.java | 99 ++++++++++++----------
 1 file changed, 52 insertions(+), 47 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index b2aacc0..7ef187f 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -53,6 +53,7 @@ import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType;
 import org.apache.geode.internal.cache.versions.RegionVersionHolder;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
@@ -83,10 +84,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   private VM accessor;
 
   @Before
-  public void setUp() throws Exception {
-    server1 = getVM(TestVM.SERVER1.vmNumber);
-    server2 = getVM(TestVM.SERVER2.vmNumber);
-    accessor = getVM(TestVM.ACCESSOR.vmNumber);
+  public void setUp() {
+    server1 = getVM(TestVM.SERVER1.getVmId());
+    server2 = getVM(TestVM.SERVER2.getVmId());
+    accessor = getVM(TestVM.ACCESSOR.getVmId());
   }
 
   /**
@@ -102,7 +103,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
   public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
       RegionShortcut regionShortcut) throws InterruptedException {
-    parametrizedSetup(regionShortcut);
+    createRegions(regionShortcut);
 
     // Let all VMs continuously execute puts and gets for 60 seconds.
     final int workMillis = 60000;
@@ -113,7 +114,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
         accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
 
     // Clear the region every second for 60 seconds.
-    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000));
+    getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 1000));
 
     // Let asyncInvocations finish.
     for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
@@ -138,7 +139,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
   public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
       RegionShortcut regionShortcut) throws InterruptedException {
-    parametrizedSetup(regionShortcut);
+    createRegions(regionShortcut);
 
     // Let all VMs continuously execute putAll and removeAll for 15 seconds.
     final int workMillis = 15000;
@@ -151,7 +152,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
         accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
 
     // Clear the region every half second for 15 seconds.
-    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500));
+    getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 500));
 
     // Let asyncInvocations finish.
     for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
@@ -176,7 +177,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @Parameters({"PARTITION", "PARTITION_REDUNDANT"})
   @TestCaseName("[{index}] {method}(RegionType:{0})")
   public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
-    parametrizedSetup(regionShortcut);
+    createRegions(regionShortcut);
     final int entries = 1000;
     populateRegion(accessor, entries, asList(accessor, server1, server2));
 
@@ -192,7 +193,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     // Wait for member to get back online and assign all buckets.
     server1.invoke(() -> {
       cacheRule.createCache();
-      initDataStore(regionShortcut);
+      createDataStore(regionShortcut);
       await().untilAsserted(
           () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
       PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
@@ -219,7 +220,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0})")
   public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
       TestVM coordinatorVM) throws InterruptedException {
-    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+    createRegions(RegionShortcut.PARTITION_REDUNDANT);
     final int entries = 7500;
     populateRegion(accessor, entries, asList(accessor, server1, server2));
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
@@ -234,12 +235,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
 
     // Retry the clear operation on the region until success (server2 will go down, but other
     // members will eventually become primary for those buckets previously hosted by server2).
-    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+    executeClearWithRetry(getVM(coordinatorVM.getVmId()));
 
     // Wait for member to get back online.
     server2.invoke(() -> {
       cacheRule.createCache();
-      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+      createDataStore(RegionShortcut.PARTITION_REDUNDANT);
       await().untilAsserted(
           () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
     });
@@ -271,7 +272,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0})")
   public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
       TestVM coordinatorVM) throws InterruptedException {
-    parametrizedSetup(RegionShortcut.PARTITION);
+    createRegions(RegionShortcut.PARTITION);
     final int entries = 7500;
     populateRegion(accessor, entries, asList(accessor, server1, server2));
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
@@ -285,7 +286,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
         accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
 
     // Clear the region.
-    getVM(coordinatorVM.vmNumber).invoke(() -> {
+    getVM(coordinatorVM.getVmId()).invoke(() -> {
       assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
           .isInstanceOf(PartitionedRegionPartialClearException.class);
     });
@@ -310,7 +311,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0})")
   public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
       TestVM coordinatorVM) throws InterruptedException {
-    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+    createRegions(RegionShortcut.PARTITION_REDUNDANT);
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
     // Let all VMs continuously execute putAll/removeAll for 30 seconds.
@@ -321,12 +322,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
 
     // Retry the clear operation on the region until success (server2 will go down, but other
     // members will eventually become primary for those buckets previously hosted by server2).
-    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+    executeClearWithRetry(getVM(coordinatorVM.getVmId()));
 
     // Wait for member to get back online.
     server2.invoke(() -> {
       cacheRule.createCache();
-      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+      createDataStore(RegionShortcut.PARTITION_REDUNDANT);
       await().untilAsserted(
           () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
     });
@@ -356,7 +357,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   @TestCaseName("[{index}] {method}(Coordinator:{0})")
   public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
       TestVM coordinatorVM) throws InterruptedException {
-    parametrizedSetup(RegionShortcut.PARTITION);
+    createRegions(RegionShortcut.PARTITION);
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
     final int workMillis = 30000;
@@ -365,7 +366,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
         accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
 
     // Clear the region.
-    getVM(coordinatorVM.vmNumber).invoke(() -> {
+    getVM(coordinatorVM.getVmId()).invoke(() -> {
       assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
           .isInstanceOf(PartitionedRegionPartialClearException.class);
     });
@@ -376,7 +377,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     }
   }
 
-  private void initAccessor(RegionShortcut regionShortcut) {
+  private void createAccessor(RegionShortcut regionShortcut) {
     PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
         .setTotalNumBuckets(BUCKETS)
         .setLocalMaxMemory(0)
@@ -388,7 +389,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
 
   }
 
-  private void initDataStore(RegionShortcut regionShortcut) {
+  private void createDataStore(RegionShortcut regionShortcut) {
     PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
         .setTotalNumBuckets(BUCKETS)
         .create();
@@ -398,10 +399,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
         .create(REGION_NAME);
   }
 
-  private void parametrizedSetup(RegionShortcut regionShortcut) {
-    server1.invoke(() -> initDataStore(regionShortcut));
-    server2.invoke(() -> initDataStore(regionShortcut));
-    accessor.invoke(() -> initAccessor(regionShortcut));
+  private void createRegions(RegionShortcut regionShortcut) {
+    server1.invoke(() -> createDataStore(regionShortcut));
+    server2.invoke(() -> createDataStore(regionShortcut));
+    accessor.invoke(() -> createAccessor(regionShortcut));
   }
 
   private void waitForSilence() {
@@ -637,10 +638,14 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   private enum TestVM {
     ACCESSOR(0), SERVER1(1), SERVER2(2);
 
-    final int vmNumber;
+    private final int vmId;
 
-    TestVM(int vmNumber) {
-      this.vmNumber = vmNumber;
+    TestVM(int vmId) {
+      this.vmId = vmId;
+    }
+
+    int getVmId() {
+      return vmId;
     }
   }
 
@@ -656,24 +661,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     }
 
     /**
-     * Shutdowns the VM whenever the message is an instance of
-     * {@link PartitionedRegionClearMessage}.
-     */
-    private void shutdownMember(DistributionMessage message) {
-      if (message instanceof PartitionedRegionClearMessage) {
-        if (((PartitionedRegionClearMessage) message)
-            .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
-          DistributionMessageObserver.setInstance(null);
-          InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
-          MembershipManagerHelper
-              .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance());
-          await().untilAsserted(
-              () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
-        }
-      }
-    }
-
-    /**
      * Invoked only on clear coordinator VM.
      *
      * @param dm the distribution manager that received the message
@@ -702,5 +689,23 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
         super.beforeProcessMessage(dm, message);
       }
     }
+
+    /**
+     * Shutdowns the VM whenever the message is an instance of
+     * {@link PartitionedRegionClearMessage}.
+     */
+    private void shutdownMember(DistributionMessage message) {
+      if (message instanceof PartitionedRegionClearMessage) {
+        PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
+        if (clearMessage.getOperationType() == OperationType.OP_PR_CLEAR) {
+          DistributionMessageObserver.setInstance(null);
+          InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+          MembershipManagerHelper
+              .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance());
+          await().untilAsserted(
+              () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+        }
+      }
+    }
   }
 }