You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:53 UTC
[geode] 15/17: GEODE-9132: Always acquire write lock for PR clear
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 30f5e90c8e116e90adeec6e2ad847c5427b50942
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Wed Apr 21 15:24:06 2021 -0700
GEODE-9132: Always acquire write lock for PR clear
---
...gionClearWithConcurrentOperationsDUnitTest.java | 110 ++++++++---------
.../internal/cache/PartitionedRegionClear.java | 115 ++++++++++++------
.../internal/cache/PartitionedRegionClearTest.java | 131 +++++++++++++++++----
3 files changed, 246 insertions(+), 110 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index 7ef187f..710ae6f 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -14,6 +14,9 @@
*/
package org.apache.geode.internal.cache;
+import static java.time.Duration.ofMillis;
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
import static org.apache.geode.internal.util.ArrayUtils.asList;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getVM;
@@ -21,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.Serializable;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -73,11 +77,16 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
private static final int BUCKETS = 13;
private static final String REGION_NAME = "PartitionedRegion";
+ private static final Duration WORK_DURATION = Duration.ofSeconds(15);
@Rule
public DistributedRule distributedRule = new DistributedRule(3);
@Rule
- public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
+ public CacheRule cacheRule = CacheRule.builder()
+ .addSystemProperty(MAX_WAIT_TIME_RECONNECT, "1000")
+ .addSystemProperty(MEMBER_TIMEOUT, "2000")
+ .createCacheInAll()
+ .build();
private VM server1;
private VM server2;
@@ -106,15 +115,15 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
createRegions(regionShortcut);
// Let all VMs continuously execute puts and gets for 60 seconds.
- final int workMillis = 60000;
+ // final int workMillis = 60000;
final int entries = 15000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executePuts(entries, workMillis)),
- server2.invokeAsync(() -> executeGets(entries, workMillis)),
- accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+ server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)),
+ server2.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+ accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION)));
// Clear the region every second for 60 seconds.
- getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 1000));
+ getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(WORK_DURATION, ofMillis(1000)));
// Let asyncInvocations finish.
for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
@@ -142,17 +151,17 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
createRegions(regionShortcut);
// Let all VMs continuously execute putAll and removeAll for 15 seconds.
- final int workMillis = 15000;
+ // final int workMillis = 15000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
- server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
- server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)),
- server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)),
- accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)),
- accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
+ server1.invokeAsync(() -> executePutAlls(0, 2000, WORK_DURATION)),
+ server1.invokeAsync(() -> executeRemoveAlls(0, 2000, WORK_DURATION)),
+ server2.invokeAsync(() -> executePutAlls(2000, 4000, WORK_DURATION)),
+ server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION)),
+ accessor.invokeAsync(() -> executePutAlls(4000, 6000, WORK_DURATION)),
+ accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, WORK_DURATION)));
// Clear the region every half second for 15 seconds.
- getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 500));
+ getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(WORK_DURATION, ofMillis(500)));
// Let asyncInvocations finish.
for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
@@ -226,12 +235,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
// Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
- final int workMillis = 30000;
+ // final int workMillis = 30000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executeGets(entries, workMillis)),
- server1.invokeAsync(() -> executePuts(entries, workMillis)),
- accessor.invokeAsync(() -> executeGets(entries, workMillis)),
- accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+ server1.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+ server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)),
+ accessor.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+ accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION)));
// Retry the clear operation on the region until success (server2 will go down, but other
// members will eventually become primary for those buckets previously hosted by server2).
@@ -278,12 +287,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
// Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
- final int workMillis = 30000;
+ // final int workMillis = 30000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executeGets(entries, workMillis)),
- server1.invokeAsync(() -> executePuts(entries, workMillis)),
- accessor.invokeAsync(() -> executeGets(entries, workMillis)),
- accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+ server1.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+ server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)),
+ accessor.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+ accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION)));
// Clear the region.
getVM(coordinatorVM.getVmId()).invoke(() -> {
@@ -315,10 +324,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
// Let all VMs continuously execute putAll/removeAll for 30 seconds.
- final int workMillis = 30000;
+ // final int workMillis = 30000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
- accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+ server1.invokeAsync(() -> executePutAlls(0, 6000, WORK_DURATION)),
+ accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION)));
// Retry the clear operation on the region until success (server2 will go down, but other
// members will eventually become primary for those buckets previously hosted by server2).
@@ -360,10 +369,9 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
createRegions(RegionShortcut.PARTITION);
server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
- final int workMillis = 30000;
List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
- server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
- accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+ server1.invokeAsync(() -> executePutAlls(0, 6000, WORK_DURATION)),
+ accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION)));
// Clear the region.
getVM(coordinatorVM.getVmId()).invoke(() -> {
@@ -520,12 +528,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
/**
- * Continuously execute get operations on the PartitionedRegion for the given durationInMillis.
+ * Continuously execute get operations on the PartitionedRegion for the given duration.
*/
- private void executeGets(final int numEntries, final long durationInMillis) {
+ private void executeGets(final int numEntries, final Duration duration) {
Cache cache = cacheRule.getCache();
Region<String, String> region = cache.getRegion(REGION_NAME);
- Instant finishTime = Instant.now().plusMillis(durationInMillis);
+ Instant finishTime = Instant.now().plusMillis(duration.toMillis());
while (Instant.now().isBefore(finishTime)) {
// Region might have been cleared in between, that's why we check for null.
@@ -537,12 +545,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
/**
- * Continuously execute put operations on the PartitionedRegion for the given durationInMillis.
+ * Continuously execute put operations on the PartitionedRegion for the given duration.
*/
- private void executePuts(final int numEntries, final long durationInMillis) {
+ private void executePuts(final int numEntries, final Duration duration) {
Cache cache = cacheRule.getCache();
Region<String, String> region = cache.getRegion(REGION_NAME);
- Instant finishTime = Instant.now().plusMillis(durationInMillis);
+ Instant finishTime = Instant.now().plusMillis(duration.toMillis());
while (Instant.now().isBefore(finishTime)) {
IntStream.range(0, numEntries).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
@@ -550,16 +558,15 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
/**
- * Continuously execute putAll operations on the PartitionedRegion for the given
- * durationInMillis.
+ * Continuously execute putAll operations on the PartitionedRegion for the given duration.
*/
- private void executePutAlls(final int startKey, final int finalKey, final long durationInMillis) {
+ private void executePutAlls(final int startKey, final int finalKey, final Duration duration) {
Cache cache = cacheRule.getCache();
Map<String, String> valuesToInsert = new HashMap<>();
Region<String, String> region = cache.getRegion(REGION_NAME);
IntStream.range(startKey, finalKey)
.forEach(i -> valuesToInsert.put(String.valueOf(i), "Value_" + i));
- Instant finishTime = Instant.now().plusMillis(durationInMillis);
+ Instant finishTime = Instant.now().plusMillis(duration.toMillis());
while (Instant.now().isBefore(finishTime)) {
region.putAll(valuesToInsert);
@@ -567,13 +574,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
/**
- * Continuously execute remove operations on the PartitionedRegion for the given
- * durationInMillis.
+ * Continuously execute remove operations on the PartitionedRegion for the given duration.
*/
- private void executeRemoves(final int numEntries, final long durationInMillis) {
+ private void executeRemoves(final int numEntries, final Duration duration) {
Cache cache = cacheRule.getCache();
Region<String, String> region = cache.getRegion(REGION_NAME);
- Instant finishTime = Instant.now().plusMillis(durationInMillis);
+ Instant finishTime = Instant.now().plusMillis(duration.toMillis());
while (Instant.now().isBefore(finishTime)) {
// Region might have been cleared in between, that's why we check for null.
@@ -585,16 +591,14 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
/**
- * Continuously execute removeAll operations on the PartitionedRegion for the given
- * durationInMillis.
+ * Continuously execute removeAll operations on the PartitionedRegion for the given duration.
*/
- private void executeRemoveAlls(final int startKey, final int finalKey,
- final long durationInMillis) {
+ private void executeRemoveAlls(final int startKey, final int finalKey, final Duration duration) {
Cache cache = cacheRule.getCache();
List<String> keysToRemove = new ArrayList<>();
Region<String, String> region = cache.getRegion(REGION_NAME);
IntStream.range(startKey, finalKey).forEach(i -> keysToRemove.add(String.valueOf(i)));
- Instant finishTime = Instant.now().plusMillis(durationInMillis);
+ Instant finishTime = Instant.now().plusMillis(duration.toMillis());
while (Instant.now().isBefore(finishTime)) {
region.removeAll(keysToRemove);
@@ -622,13 +626,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
}
/**
- * Continuously execute clear operations on the PartitionedRegion every periodInMillis for the
- * given durationInMillis.
+ * Continuously execute clear operations on the PartitionedRegion every period for the
+ * given duration.
*/
- private void executeClears(final long durationInMillis, final long periodInMillis) {
+ private void executeClears(final Duration duration, final Duration period) {
Cache cache = cacheRule.getCache();
Region<String, String> region = cache.getRegion(REGION_NAME);
- long minimumInvocationCount = durationInMillis / periodInMillis;
+ long minimumInvocationCount = duration.toMillis() / period.toMillis();
for (int invocationCount = 0; invocationCount < minimumInvocationCount; invocationCount++) {
region.clear();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 569f78c..b8597c1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.OperationAbortedException;
@@ -30,10 +31,12 @@ import org.apache.geode.cache.PartitionedRegionPartialClearException;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage.PartitionedRegionClearResponse;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -47,6 +50,8 @@ public class PartitionedRegionClear {
private final PartitionedRegion partitionedRegion;
+ private final DistributedLockService distributedLockService;
+
protected final LockForListenerAndClientNotification lockForListenerAndClientNotification =
new LockForListenerAndClientNotification();
@@ -55,8 +60,35 @@ public class PartitionedRegionClear {
protected final PartitionedRegionClearListener partitionedRegionClearListener =
new PartitionedRegionClearListener();
+ private final ColocationLeaderRegionProvider colocationLeaderRegionProvider;
+ private final AssignBucketsToPartitions assignBucketsToPartitions;
+ private final UpdateAttributesProcessorFactory updateAttributesProcessorFactory;
+
public PartitionedRegionClear(PartitionedRegion partitionedRegion) {
+ this(partitionedRegion,
+ partitionedRegion.getPartitionedRegionLockService(),
+ ColocationHelper::getLeaderRegion,
+ PartitionRegionHelper::assignBucketsToPartitions,
+ pr -> new UpdateAttributesProcessor(pr, true));
+ }
+
+ @VisibleForTesting
+ PartitionedRegionClear(PartitionedRegion partitionedRegion,
+ DistributedLockService distributedLockService,
+ ColocationLeaderRegionProvider colocationLeaderRegionProvider,
+ AssignBucketsToPartitions assignBucketsToPartitions,
+ UpdateAttributesProcessorFactory updateAttributesProcessorFactory) {
this.partitionedRegion = partitionedRegion;
+ this.distributedLockService = distributedLockService;
+ this.colocationLeaderRegionProvider = colocationLeaderRegionProvider;
+ this.assignBucketsToPartitions = assignBucketsToPartitions;
+ this.updateAttributesProcessorFactory = updateAttributesProcessorFactory;
+
+ // TODO: initialize needs to move out of constructor to prevent escape of reference to 'this'
+ initialize();
+ }
+
+ private void initialize() {
partitionedRegion.getDistributionManager()
.addMembershipListener(partitionedRegionClearListener);
}
@@ -67,7 +99,7 @@ public class PartitionedRegionClear {
void acquireDistributedClearLock(String clearLock) {
try {
- partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1);
+ distributedLockService.lock(clearLock, -1, -1);
} catch (IllegalStateException e) {
partitionedRegion.lockCheckReadiness();
throw e;
@@ -76,7 +108,7 @@ public class PartitionedRegionClear {
void releaseDistributedClearLock(String clearLock) {
try {
- partitionedRegion.getPartitionedRegionLockService().unlock(clearLock);
+ distributedLockService.unlock(clearLock);
} catch (IllegalStateException e) {
partitionedRegion.lockCheckReadiness();
} catch (Exception ex) {
@@ -303,7 +335,7 @@ public class PartitionedRegionClear {
protected Set<Integer> attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
PartitionedRegionClearMessage.OperationType op)
throws ForceReattemptException {
- Set<Integer> bucketsOperated = new HashSet<>();
+ Set<Integer> clearedBuckets = new HashSet<>();
if (partitionedRegion.getPRRoot() == null) {
if (logger.isDebugEnabled()) {
@@ -311,8 +343,10 @@ public class PartitionedRegionClear {
"Partition region {} failed to initialize. Remove its profile from remote members.",
this.partitionedRegion);
}
- new UpdateAttributesProcessor(partitionedRegion, true).distribute(false);
- return bucketsOperated;
+ updateAttributesProcessorFactory
+ .create(partitionedRegion)
+ .distribute(false);
+ return clearedBuckets;
}
final Set<InternalDistributedMember> configRecipients =
@@ -325,9 +359,9 @@ public class PartitionedRegionClear {
if (prConfig != null) {
for (Node node : prConfig.getNodes()) {
- InternalDistributedMember idm = node.getMemberId();
- if (!idm.equals(partitionedRegion.getMyId())) {
- configRecipients.add(idm);
+ InternalDistributedMember memberId = node.getMemberId();
+ if (!memberId.equals(partitionedRegion.getMyId())) {
+ configRecipients.add(memberId);
}
}
}
@@ -336,29 +370,29 @@ public class PartitionedRegionClear {
}
try {
- PartitionedRegionClearMessage.PartitionedRegionClearResponse resp =
- new PartitionedRegionClearMessage.PartitionedRegionClearResponse(
- partitionedRegion.getSystem(), configRecipients);
- PartitionedRegionClearMessage partitionedRegionClearMessage =
- new PartitionedRegionClearMessage(configRecipients, partitionedRegion, resp, op, event);
- partitionedRegionClearMessage.send();
+ PartitionedRegionClearResponse clearResponse =
+ new PartitionedRegionClearResponse(partitionedRegion.getSystem(), configRecipients);
+ PartitionedRegionClearMessage clearMessage =
+ new PartitionedRegionClearMessage(configRecipients, partitionedRegion, clearResponse, op,
+ event);
+ clearMessage.send();
- resp.waitForRepliesUninterruptibly();
- bucketsOperated = resp.bucketsCleared;
+ clearResponse.waitForRepliesUninterruptibly();
+ clearedBuckets = clearResponse.bucketsCleared;
} catch (ReplyException e) {
- Throwable t = e.getCause();
- if (t instanceof ForceReattemptException) {
- throw (ForceReattemptException) t;
+ Throwable cause = e.getCause();
+ if (cause instanceof ForceReattemptException) {
+ throw (ForceReattemptException) cause;
}
- if (t instanceof PartitionedRegionPartialClearException) {
- throw new PartitionedRegionPartialClearException(t.getMessage(), t);
+ if (cause instanceof PartitionedRegionPartialClearException) {
+ throw (PartitionedRegionPartialClearException) cause;
}
logger.warn(
"PartitionedRegionClear#sendPartitionedRegionClearMessage: Caught exception during ClearRegionMessage send and waiting for response",
e);
}
- return bucketsOperated;
+ return clearedBuckets;
}
/**
@@ -412,14 +446,9 @@ public class PartitionedRegionClear {
invokeCacheWriter(regionEvent);
}
- // Check if there are any listeners or clients interested. If so, then clear write
- // locks needs to be taken on all local and remote primary buckets in order to
- // preserve the ordering of client events (for concurrent operations on the region).
- boolean acquireClearLockForNotification =
- (partitionedRegion.hasAnyClientsInterested() || partitionedRegion.hasListener());
- if (acquireClearLockForNotification) {
- obtainLockForClear(regionEvent);
- }
+ // clear write locks need to be taken on all local and remote primary buckets
+ // whether or not the partitioned region has any listeners clients interested
+ obtainLockForClear(regionEvent);
try {
Set<Integer> bucketsCleared = clearRegion(regionEvent);
@@ -435,9 +464,7 @@ public class PartitionedRegionClear {
throw new PartitionedRegionPartialClearException(message);
}
} finally {
- if (acquireClearLockForNotification) {
- releaseLockForClear(regionEvent);
- }
+ releaseLockForClear(regionEvent);
}
} finally {
releaseDistributedClearLock(lockName);
@@ -458,8 +485,8 @@ public class PartitionedRegionClear {
}
protected void assignAllPrimaryBuckets() {
- PartitionedRegion leader = ColocationHelper.getLeaderRegion(partitionedRegion);
- PartitionRegionHelper.assignBucketsToPartitions(leader);
+ PartitionedRegion leader = colocationLeaderRegionProvider.getLeaderRegion(partitionedRegion);
+ assignBucketsToPartitions.assignBucketsToPartitions(leader);
}
protected void handleClearFromDepartedMember(InternalDistributedMember departedMember) {
@@ -505,6 +532,24 @@ public class PartitionedRegionClear {
return membershipChange;
}
+ @FunctionalInterface
+ @VisibleForTesting
+ interface ColocationLeaderRegionProvider {
+ PartitionedRegion getLeaderRegion(PartitionedRegion partitionedRegion);
+ }
+
+ @FunctionalInterface
+ @VisibleForTesting
+ interface AssignBucketsToPartitions {
+ void assignBucketsToPartitions(PartitionedRegion partitionedRegion);
+ }
+
+ @FunctionalInterface
+ @VisibleForTesting
+ interface UpdateAttributesProcessorFactory {
+ UpdateAttributesProcessor create(PartitionedRegion partitionedRegion);
+ }
+
protected class PartitionedRegionClearListener implements MembershipListener {
@Override
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
index 721d236..376fc8e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentCaptor.forClass;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
@@ -42,7 +43,6 @@ import org.mockito.ArgumentCaptor;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.PartitionedRegionPartialClearException;
import org.apache.geode.cache.Region;
-import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionManager;
@@ -50,41 +50,74 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.PartitionedRegion.RetryTimeKeeper;
+import org.apache.geode.internal.cache.PartitionedRegionClear.AssignBucketsToPartitions;
+import org.apache.geode.internal.cache.PartitionedRegionClear.ColocationLeaderRegionProvider;
import org.apache.geode.internal.cache.PartitionedRegionClear.PartitionedRegionClearListener;
+import org.apache.geode.internal.cache.PartitionedRegionClear.UpdateAttributesProcessorFactory;
import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.serialization.KnownVersion;
public class PartitionedRegionClearTest {
- private GemFireCacheImpl cache;
- private HashSet<AsyncEventQueue> allAEQs = new HashSet<>();
private PartitionedRegionClear partitionedRegionClear;
private DistributionManager distributionManager;
private PartitionedRegion partitionedRegion;
private RegionAdvisor regionAdvisor;
private InternalDistributedMember internalDistributedMember;
+ private DistributedLockService distributedLockService;
@Before
public void setUp() {
-
- cache = mock(GemFireCacheImpl.class);
+ AssignBucketsToPartitions assignBucketsToPartitions = mock(AssignBucketsToPartitions.class);
+ GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+ ColocationLeaderRegionProvider colocationLeaderRegionProvider =
+ mock(ColocationLeaderRegionProvider.class);
+ distributedLockService = mock(DistributedLockService.class);
distributionManager = mock(DistributionManager.class);
+ FilterProfile filterProfile = mock(FilterProfile.class);
internalDistributedMember = mock(InternalDistributedMember.class);
partitionedRegion = mock(PartitionedRegion.class);
regionAdvisor = mock(RegionAdvisor.class);
+ UpdateAttributesProcessorFactory updateAttributesProcessorFactory =
+ mock(UpdateAttributesProcessorFactory.class);
+
+ when(cache.getAsyncEventQueues(false))
+ .thenReturn(emptySet());
+ when(colocationLeaderRegionProvider.getLeaderRegion(any()))
+ .thenReturn(partitionedRegion);
+ when(distributedLockService.lock(anyString(), anyInt(), anyInt()))
+ .thenReturn(true);
+ when(distributionManager.getDistributionManagerId())
+ .thenReturn(internalDistributedMember);
+ when(distributionManager.getId())
+ .thenReturn(internalDistributedMember);
+ when(internalDistributedMember.getVersion())
+ .thenReturn(KnownVersion.CURRENT);
+ when(partitionedRegion.getCache())
+ .thenReturn(cache);
+ when(partitionedRegion.getDistributionManager())
+ .thenReturn(distributionManager);
+ when(partitionedRegion.getName())
+ .thenReturn("prRegion");
+ when(partitionedRegion.getRegionAdvisor())
+ .thenReturn(regionAdvisor);
+ when(partitionedRegion.getFilterProfile())
+ .thenReturn(filterProfile);
+ when(filterProfile.getFilterRoutingInfoPart1(any(), any(), any()))
+ .thenReturn(mock(FilterRoutingInfo.class));
+ when(filterProfile.getFilterRoutingInfoPart2(any(), any()))
+ .thenReturn(mock(FilterRoutingInfo.class));
+ when(regionAdvisor.getDistributionManager())
+ .thenReturn(distributionManager);
+ when(updateAttributesProcessorFactory.create(any()))
+ .thenReturn(mock(UpdateAttributesProcessor.class));
- when(distributionManager.getDistributionManagerId()).thenReturn(internalDistributedMember);
- when(distributionManager.getId()).thenReturn(internalDistributedMember);
- when(internalDistributedMember.getVersion()).thenReturn(KnownVersion.CURRENT);
- when(partitionedRegion.getCache()).thenReturn(cache);
- when(cache.getAsyncEventQueues(false)).thenReturn(allAEQs);
- when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager);
- when(partitionedRegion.getName()).thenReturn("prRegion");
- when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
- when(regionAdvisor.getDistributionManager()).thenReturn(distributionManager);
+ doNothing().when(distributedLockService).unlock(anyString());
- partitionedRegionClear = new PartitionedRegionClear(partitionedRegion);
+ partitionedRegionClear = new PartitionedRegionClear(partitionedRegion, distributedLockService,
+ colocationLeaderRegionProvider, assignBucketsToPartitions,
+ updateAttributesProcessorFactory);
}
@Test
@@ -115,9 +148,7 @@ public class PartitionedRegionClearTest {
@Test
public void acquireDistributedClearLockGetsDistributedLock() {
// arrange
- DistributedLockService distributedLockService = mock(DistributedLockService.class);
String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName();
- when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService);
// act
partitionedRegionClear.acquireDistributedClearLock(lockName);
@@ -129,9 +160,7 @@ public class PartitionedRegionClearTest {
@Test
public void releaseDistributedClearLockReleasesDistributedLock() {
// arrange
- DistributedLockService distributedLockService = mock(DistributedLockService.class);
String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName();
- when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService);
// act
partitionedRegionClear.releaseDistributedClearLock(lockName);
@@ -567,6 +596,7 @@ public class PartitionedRegionClearTest {
public void doClearAcquiresAndReleasesDistributedClearLockAndCreatesAllPrimaryBuckets() {
// arrange
RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(regionEvent.clone()).thenReturn(regionEvent);
// partial mocking to stub some methods and verify
PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
@@ -587,6 +617,7 @@ public class PartitionedRegionClearTest {
public void doClearInvokesCacheWriterWhenCacheWriteIsSet() {
// arrange
RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(regionEvent.clone()).thenReturn(regionEvent);
// partial mocking to stub some methods and verify
PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
@@ -605,6 +636,7 @@ public class PartitionedRegionClearTest {
public void doClearDoesNotInvokesCacheWriterWhenCacheWriteIsNotSet() {
// arrange
RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(regionEvent.clone()).thenReturn(regionEvent);
// partial mocking to stub some methods and verify
PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
@@ -669,12 +701,13 @@ public class PartitionedRegionClearTest {
}
@Test
- public void doClearDoesNotObtainLockForClearWhenRegionHasNoListenerAndNoClientInterest() {
+ public void doClearObtainsLockForClearWhenRegionHasNoListenerAndNoClientInterest() {
// arrange
RegionEventImpl regionEvent = mock(RegionEventImpl.class);
when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
when(partitionedRegion.hasListener()).thenReturn(false);
+ when(regionEvent.clone()).thenReturn(regionEvent);
// partial mocking to stub some methods and verify
PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
@@ -688,8 +721,8 @@ public class PartitionedRegionClearTest {
spyPartitionedRegionClear.doClear(regionEvent, false);
// assert
- verify(spyPartitionedRegionClear, never()).obtainLockForClear(regionEvent);
- verify(spyPartitionedRegionClear, never()).releaseLockForClear(regionEvent);
+ verify(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+ verify(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
}
@Test
@@ -867,6 +900,60 @@ public class PartitionedRegionClearTest {
.isNotNull();
}
+ @Test
+ public void doClearAcquiresLockForClearWhenHasAnyClientsInterestedIsTrue() {
+ // arrange
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true);
+ when(partitionedRegion.hasListener()).thenReturn(false);
+ when(regionEvent.clone()).thenReturn(regionEvent);
+
+ partitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent);
+
+ // act
+ partitionedRegionClear.doClear(regionEvent, false);
+
+ // assert
+ verify(partitionedRegionClear).obtainLockForClear(regionEvent);
+ }
+
+ @Test
+ public void doClearAcquiresLockForClearWhenHasListenerIsTrue() {
+ // arrange
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+ when(partitionedRegion.hasListener()).thenReturn(true);
+ when(regionEvent.clone()).thenReturn(regionEvent);
+
+ partitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent);
+
+ // act
+ partitionedRegionClear.doClear(regionEvent, false);
+
+ // assert
+ verify(partitionedRegionClear).obtainLockForClear(regionEvent);
+ }
+
+ @Test
+ public void doClearAcquiresLockForClearWhenHasAnyClientsInterestedAndHasListenerAreFalse() {
+ // arrange
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+ when(partitionedRegion.hasListener()).thenReturn(false);
+ when(regionEvent.clone()).thenReturn(regionEvent);
+
+ partitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent);
+
+ // act
+ partitionedRegionClear.doClear(regionEvent, false);
+
+ // assert
+ verify(partitionedRegionClear).obtainLockForClear(regionEvent);
+ }
+
private Set<BucketRegion> setupBucketRegions(
PartitionedRegionDataStore dataStore,
BucketAdvisor bucketAdvisor) {