You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:49 UTC
[geode] 11/17: GEODE-9132:
PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 1
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 41eba463fb32d8ecd227c264854c5788190aab60
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Apr 19 14:38:02 2021 -0700
GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 1
---
...gionClearWithConcurrentOperationsDUnitTest.java | 97 +++++++++-------------
1 file changed, 41 insertions(+), 56 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index 77537cb..c9a1e5b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -28,11 +28,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import junitparams.JUnitParamsRunner;
@@ -61,7 +56,6 @@ import org.apache.geode.distributed.internal.membership.api.MembershipManagerHel
import org.apache.geode.internal.cache.versions.RegionVersionHolder;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
@@ -73,15 +67,16 @@ import org.apache.geode.test.dunit.rules.DistributedRule;
* added or removed.
*/
@RunWith(JUnitParamsRunner.class)
+@SuppressWarnings("serial")
public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements Serializable {
- private static final Integer BUCKETS = 13;
+
+ private static final int BUCKETS = 13;
private static final String REGION_NAME = "PartitionedRegion";
private static final String TEST_CASE_NAME =
"[{index}] {method}(Coordinator:{0}, RegionType:{1})";
@Rule
public DistributedRule distributedRule = new DistributedRule(3);
-
@Rule
public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
@@ -89,22 +84,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
private VM server2;
private VM accessor;
- private enum TestVM {
- ACCESSOR(0), SERVER1(1), SERVER2(2);
-
- final int vmNumber;
-
- TestVM(int vmNumber) {
- this.vmNumber = vmNumber;
- }
- }
-
- static RegionShortcut[] regionTypes() {
- return new RegionShortcut[] {
- RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT
- };
- }
-
@SuppressWarnings("unused")
static TestVM[] coordinators() {
return new TestVM[] {
@@ -114,7 +93,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@SuppressWarnings("unused")
static Object[] coordinatorsAndRegionTypes() {
- ArrayList<Object[]> parameters = new ArrayList<>();
+ List<Object[]> parameters = new ArrayList<>();
RegionShortcut[] regionShortcuts = regionTypes();
Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
@@ -125,6 +104,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
return parameters.toArray();
}
+ private static RegionShortcut[] regionTypes() {
+ return new RegionShortcut[] {
+ RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT
+ };
+ }
+
@Before
public void setUp() throws Exception {
server1 = getVM(TestVM.SERVER1.vmNumber);
@@ -178,7 +163,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
/**
* Populates the region and verifies the data on the selected VMs.
*/
- private void populateRegion(VM feeder, int entryCount, List<VM> vms) {
+ private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) {
feeder.invoke(() -> {
Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
@@ -240,14 +225,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
* Asserts that the region data is consistent across buckets.
*/
private void assertRegionBucketsConsistency() throws ForceReattemptException {
- List<BucketDump> bucketDumps;
PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
// Redundant copies + 1 primary.
int expectedCopies = region.getRedundantCopies() + 1;
for (int bId = 0; bId < BUCKETS; bId++) {
final int bucketId = bId;
- bucketDumps = region.getAllBucketEntries(bucketId);
+ List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId);
assertThat(bucketDumps.size())
.as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has "
+ bucketDumps.size())
@@ -379,26 +363,16 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
/**
* Continuously execute clear operations on the PartitionedRegion every periodInMillis for the
- * given
- * durationInMillis.
+ * given durationInMillis.
*/
- private void executeClears(final long durationInMillis, final long periodInMillis)
- throws InterruptedException {
+ private void executeClears(final long durationInMillis, final long periodInMillis) {
Cache cache = cacheRule.getCache();
- AtomicLong invocationCount = new AtomicLong(0);
Region<String, String> region = cache.getRegion(REGION_NAME);
- Long minimumInvocationCount = (durationInMillis / periodInMillis);
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(() -> {
+ long minimumInvocationCount = durationInMillis / periodInMillis;
+
+ for (int invocationCount = 0; invocationCount < minimumInvocationCount; invocationCount++) {
region.clear();
- invocationCount.incrementAndGet();
- }, 0, periodInMillis, TimeUnit.MILLISECONDS);
-
- await().untilAsserted(
- () -> assertThat(invocationCount.get()).isGreaterThanOrEqualTo(minimumInvocationCount));
- scheduledFuture.cancel(false);
- executor.shutdown();
- executor.awaitTermination(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
+ }
}
/**
@@ -413,11 +387,11 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@Parameters(method = "coordinatorsAndRegionTypes")
public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM,
RegionShortcut regionShortcut) throws InterruptedException {
- final int entries = 15000;
- final int workMillis = 60000;
parametrizedSetup(regionShortcut);
// Let all VMs continuously execute puts and gets for 60 seconds.
+ final int workMillis = 60000;
+ final int entries = 15000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
server1.invokeAsync(() -> executePuts(entries, workMillis)),
server2.invokeAsync(() -> executeGets(entries, workMillis)),
@@ -448,10 +422,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@Parameters(method = "coordinatorsAndRegionTypes")
public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM,
RegionShortcut regionShortcut) throws InterruptedException {
- final int workMillis = 15000;
parametrizedSetup(regionShortcut);
// Let all VMs continuously execute putAll and removeAll for 15 seconds.
+ final int workMillis = 15000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
@@ -486,8 +460,8 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(RegionType:{0})")
@Parameters(method = "regionTypes")
public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) {
- final int entries = 1000;
parametrizedSetup(regionShortcut);
+ final int entries = 1000;
populateRegion(accessor, entries, asList(accessor, server1, server2));
// Set the CoordinatorMemberKiller and try to clear the region
@@ -529,13 +503,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0})")
public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
TestVM coordinatorVM) throws InterruptedException {
- final int entries = 7500;
- final int workMillis = 30000;
parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+ final int entries = 7500;
populateRegion(accessor, entries, asList(accessor, server1, server2));
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
// Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+ final int workMillis = 30000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
server1.invokeAsync(() -> executeGets(entries, workMillis)),
server1.invokeAsync(() -> executePuts(entries, workMillis)),
@@ -581,13 +555,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0})")
public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
TestVM coordinatorVM) throws InterruptedException {
- final int entries = 7500;
- final int workMillis = 30000;
parametrizedSetup(RegionShortcut.PARTITION);
+ final int entries = 7500;
populateRegion(accessor, entries, asList(accessor, server1, server2));
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
// Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
+ final int workMillis = 30000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
server1.invokeAsync(() -> executeGets(entries, workMillis)),
server1.invokeAsync(() -> executePuts(entries, workMillis)),
@@ -620,11 +594,11 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0})")
public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
TestVM coordinatorVM) throws InterruptedException {
- final int workMillis = 30000;
parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
// Let all VMs continuously execute putAll/removeAll for 30 seconds.
+ final int workMillis = 30000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
@@ -666,10 +640,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
@TestCaseName("[{index}] {method}(Coordinator:{0})")
public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced(
TestVM coordinatorVM) throws InterruptedException {
- final int workMillis = 30000;
parametrizedSetup(RegionShortcut.PARTITION);
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
+ final int workMillis = 30000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
@@ -686,13 +660,24 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
}
+ private enum TestVM {
+ ACCESSOR(0), SERVER1(1), SERVER2(2);
+
+ final int vmNumber;
+
+ TestVM(int vmNumber) {
+ this.vmNumber = vmNumber;
+ }
+ }
+
/**
* Shutdowns a coordinator member while the clear operation is in progress.
*/
- public static class MemberKiller extends DistributionMessageObserver {
+ private static class MemberKiller extends DistributionMessageObserver {
+
private final boolean coordinator;
- public MemberKiller(boolean coordinator) {
+ private MemberKiller(boolean coordinator) {
this.coordinator = coordinator;
}