You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2022/06/08 22:42:45 UTC
[geode] 13/22: GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 3
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit b7c5189c2aa939793feca4fdba6a8723828b97b1
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Apr 19 16:42:22 2021 -0700
GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 3
---
...gionClearWithConcurrentOperationsDUnitTest.java | 99 ++++++++++++----------
1 file changed, 52 insertions(+), 47 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index b2aacc05fa..7ef187fd1f 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -53,6 +53,7 @@ import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType;
import org.apache.geode.internal.cache.versions.RegionVersionHolder;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
@@ -83,10 +84,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
private VM accessor;
@Before
- public void setUp() throws Exception {
- server1 = getVM(TestVM.SERVER1.vmNumber);
- server2 = getVM(TestVM.SERVER2.vmNumber);
- accessor = getVM(TestVM.ACCESSOR.vmNumber);
+ public void setUp() {
+ server1 = getVM(TestVM.SERVER1.getVmId());
+ server2 = getVM(TestVM.SERVER2.getVmId());
+ accessor = getVM(TestVM.ACCESSOR.getVmId());
}
/**
@@ -102,7 +103,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
RegionShortcut regionShortcut) throws InterruptedException {
- parametrizedSetup(regionShortcut);
+ createRegions(regionShortcut);
// Let all VMs continuously execute puts and gets for 60 seconds.
final int workMillis = 60000;
@@ -113,7 +114,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
// Clear the region every second for 60 seconds.
- getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000));
+ getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 1000));
// Let asyncInvocations finish.
for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
@@ -138,7 +139,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
RegionShortcut regionShortcut) throws InterruptedException {
- parametrizedSetup(regionShortcut);
+ createRegions(regionShortcut);
// Let all VMs continuously execute putAll and removeAll for 15 seconds.
final int workMillis = 15000;
@@ -151,7 +152,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
// Clear the region every half second for 15 seconds.
- getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500));
+ getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 500));
// Let asyncInvocations finish.
for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
@@ -176,7 +177,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@Parameters({"PARTITION", "PARTITION_REDUNDANT"})
@TestCaseName("[{index}] {method}(RegionType:{0})")
public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
- parametrizedSetup(regionShortcut);
+ createRegions(regionShortcut);
final int entries = 1000;
populateRegion(accessor, entries, asList(accessor, server1, server2));
@@ -192,7 +193,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
// Wait for member to get back online and assign all buckets.
server1.invoke(() -> {
cacheRule.createCache();
- initDataStore(regionShortcut);
+ createDataStore(regionShortcut);
await().untilAsserted(
() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
@@ -219,7 +220,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0})")
public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
TestVM coordinatorVM) throws InterruptedException {
- parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+ createRegions(RegionShortcut.PARTITION_REDUNDANT);
final int entries = 7500;
populateRegion(accessor, entries, asList(accessor, server1, server2));
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
@@ -234,12 +235,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
// Retry the clear operation on the region until success (server2 will go down, but other
// members will eventually become primary for those buckets previously hosted by server2).
- executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+ executeClearWithRetry(getVM(coordinatorVM.getVmId()));
// Wait for member to get back online.
server2.invoke(() -> {
cacheRule.createCache();
- initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+ createDataStore(RegionShortcut.PARTITION_REDUNDANT);
await().untilAsserted(
() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
});
@@ -271,7 +272,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0})")
public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
TestVM coordinatorVM) throws InterruptedException {
- parametrizedSetup(RegionShortcut.PARTITION);
+ createRegions(RegionShortcut.PARTITION);
final int entries = 7500;
populateRegion(accessor, entries, asList(accessor, server1, server2));
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
@@ -285,7 +286,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
// Clear the region.
- getVM(coordinatorVM.vmNumber).invoke(() -> {
+ getVM(coordinatorVM.getVmId()).invoke(() -> {
assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
.isInstanceOf(PartitionedRegionPartialClearException.class);
});
@@ -310,7 +311,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0})")
public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
TestVM coordinatorVM) throws InterruptedException {
- parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+ createRegions(RegionShortcut.PARTITION_REDUNDANT);
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
// Let all VMs continuously execute putAll/removeAll for 30 seconds.
@@ -321,12 +322,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
// Retry the clear operation on the region until success (server2 will go down, but other
// members will eventually become primary for those buckets previously hosted by server2).
- executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+ executeClearWithRetry(getVM(coordinatorVM.getVmId()));
// Wait for member to get back online.
server2.invoke(() -> {
cacheRule.createCache();
- initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+ createDataStore(RegionShortcut.PARTITION_REDUNDANT);
await().untilAsserted(
() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
});
@@ -356,7 +357,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0})")
public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
TestVM coordinatorVM) throws InterruptedException {
- parametrizedSetup(RegionShortcut.PARTITION);
+ createRegions(RegionShortcut.PARTITION);
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
final int workMillis = 30000;
@@ -365,7 +366,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
// Clear the region.
- getVM(coordinatorVM.vmNumber).invoke(() -> {
+ getVM(coordinatorVM.getVmId()).invoke(() -> {
assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
.isInstanceOf(PartitionedRegionPartialClearException.class);
});
@@ -376,7 +377,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
}
- private void initAccessor(RegionShortcut regionShortcut) {
+ private void createAccessor(RegionShortcut regionShortcut) {
PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
.setTotalNumBuckets(BUCKETS)
.setLocalMaxMemory(0)
@@ -388,7 +389,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
- private void initDataStore(RegionShortcut regionShortcut) {
+ private void createDataStore(RegionShortcut regionShortcut) {
PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>()
.setTotalNumBuckets(BUCKETS)
.create();
@@ -398,10 +399,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
.create(REGION_NAME);
}
- private void parametrizedSetup(RegionShortcut regionShortcut) {
- server1.invoke(() -> initDataStore(regionShortcut));
- server2.invoke(() -> initDataStore(regionShortcut));
- accessor.invoke(() -> initAccessor(regionShortcut));
+ private void createRegions(RegionShortcut regionShortcut) {
+ server1.invoke(() -> createDataStore(regionShortcut));
+ server2.invoke(() -> createDataStore(regionShortcut));
+ accessor.invoke(() -> createAccessor(regionShortcut));
}
private void waitForSilence() {
@@ -637,10 +638,14 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
private enum TestVM {
ACCESSOR(0), SERVER1(1), SERVER2(2);
- final int vmNumber;
+ private final int vmId;
- TestVM(int vmNumber) {
- this.vmNumber = vmNumber;
+ TestVM(int vmId) {
+ this.vmId = vmId;
+ }
+
+ int getVmId() {
+ return vmId;
}
}
@@ -655,24 +660,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
this.coordinator = coordinator;
}
- /**
- * Shutdowns the VM whenever the message is an instance of
- * {@link PartitionedRegionClearMessage}.
- */
- private void shutdownMember(DistributionMessage message) {
- if (message instanceof PartitionedRegionClearMessage) {
- if (((PartitionedRegionClearMessage) message)
- .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
- DistributionMessageObserver.setInstance(null);
- InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
- MembershipManagerHelper
- .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance());
- await().untilAsserted(
- () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
- }
- }
- }
-
/**
* Invoked only on clear coordinator VM.
*
@@ -702,5 +689,23 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
super.beforeProcessMessage(dm, message);
}
}
+
+ /**
+ * Shutdowns the VM whenever the message is an instance of
+ * {@link PartitionedRegionClearMessage}.
+ */
+ private void shutdownMember(DistributionMessage message) {
+ if (message instanceof PartitionedRegionClearMessage) {
+ PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
+ if (clearMessage.getOperationType() == OperationType.OP_PR_CLEAR) {
+ DistributionMessageObserver.setInstance(null);
+ InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+ MembershipManagerHelper
+ .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance());
+ await().untilAsserted(
+ () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+ }
+ }
+ }
}
}