You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/02 19:28:59 UTC
[geode] 13/19: GEODE-8173: Add unit test (coverage) for
PartitionedRegionClear class. (#5208)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 271054561dd476578e69e2514fd204b132edb236
Author: agingade <ag...@pivotal.io>
AuthorDate: Mon Jun 8 10:23:50 2020 -0700
GEODE-8173: Add unit test (coverage) for PartitionedRegionClear class. (#5208)
* GEODE-8173: Add unit test (coverage) for PartitionedRegionClear class.
Co-authored-by: anilkumar gingade <an...@anilg.local>
---
.../cache/PRCacheListenerDistributedTest.java | 337 +++++++++++-
.../ReplicateCacheListenerDistributedTest.java | 4 +-
.../geode/internal/cache/PartitionedRegion.java | 2 +-
.../internal/cache/PartitionedRegionClear.java | 83 ++-
.../internal/cache/PartitionedRegionClearTest.java | 611 +++++++++++++++++++++
5 files changed, 999 insertions(+), 38 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
index f4a9ac9..7d95473 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
@@ -17,10 +17,18 @@ package org.apache.geode.cache;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.VM.getVMCount;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -28,7 +36,13 @@ import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.runners.Parameterized.UseParametersRunnerFactory;
+import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.dunit.rules.SharedCountersRule;
+import org.apache.geode.test.dunit.rules.SharedErrorCollector;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
/**
@@ -43,7 +57,28 @@ import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor
@RunWith(Parameterized.class)
@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
@SuppressWarnings("serial")
-public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest {
+public class PRCacheListenerDistributedTest implements Serializable {
+
+ protected static final String CLEAR = "CLEAR";
+ protected static final String REGION_DESTROY = "REGION_DESTROY";
+ private static final String CREATES = "CREATES";
+ private static final String UPDATES = "UPDATES";
+ private static final String INVALIDATES = "INVALIDATES";
+ private static final String DESTROYS = "DESTROYS";
+ private static final int ENTRY_VALUE = 0;
+ private static final int UPDATED_ENTRY_VALUE = 1;
+ private static final String KEY = "key-1";
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+ @Rule
+ public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+ @Rule
+ public SharedCountersRule sharedCountersRule = new SharedCountersRule();
+ @Rule
+ public SharedErrorCollector errorCollector = new SharedErrorCollector();
+ protected String regionName;
@Parameters
public static Collection<Object[]> data() {
@@ -59,7 +94,6 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
@Parameter(1)
public Boolean withData;
- @Override
protected Region<String, Integer> createRegion(final String name,
final CacheListener<String, Integer> listener) {
return createPartitionedRegion(name, listener, false);
@@ -99,22 +133,18 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
}
}
- @Override
protected int expectedCreates() {
return 1;
}
- @Override
protected int expectedUpdates() {
return 1;
}
- @Override
protected int expectedInvalidates() {
return 1;
}
- @Override
protected int expectedDestroys() {
return 1;
}
@@ -132,7 +162,8 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
region.destroyRegion();
- assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys());
+ assertThat(sharedCountersRule.getTotal(REGION_DESTROY))
+ .isGreaterThanOrEqualTo(expectedRegionDestroys());
}
@Test
@@ -321,4 +352,296 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
}
+ @Before
+ public void setUp() {
+ regionName = getClass().getSimpleName();
+
+ sharedCountersRule.initialize(CREATES);
+ sharedCountersRule.initialize(DESTROYS);
+ sharedCountersRule.initialize(INVALIDATES);
+ sharedCountersRule.initialize(UPDATES);
+ sharedCountersRule.initialize(CLEAR);
+ sharedCountersRule.initialize(REGION_DESTROY);
+ }
+
+ @Test
+ public void afterCreateIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new CreateCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ }
+
+ region.put(KEY, ENTRY_VALUE, cacheRule.getSystem().getDistributedMember());
+
+ assertThat(sharedCountersRule.getTotal(CREATES)).isEqualTo(expectedCreates());
+ }
+
+ @Test
+ public void afterUpdateIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new UpdateCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ }
+
+ region.put(KEY, ENTRY_VALUE, cacheRule.getSystem().getDistributedMember());
+ region.put(KEY, UPDATED_ENTRY_VALUE, cacheRule.getSystem().getDistributedMember());
+
+ assertThat(sharedCountersRule.getTotal(UPDATES)).isEqualTo(expectedUpdates());
+ }
+
+ @Test
+ public void afterInvalidateIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new InvalidateCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ }
+
+ region.put(KEY, 0, cacheRule.getSystem().getDistributedMember());
+ region.invalidate(KEY);
+
+ assertThat(sharedCountersRule.getTotal(INVALIDATES)).isEqualTo(expectedInvalidates());
+ assertThat(region.get(KEY)).isNull();
+ }
+
+ @Test
+ public void afterDestroyIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new DestroyCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ }
+
+ region.put(KEY, 0, cacheRule.getSystem().getDistributedMember());
+ region.destroy(KEY);
+
+ assertThat(sharedCountersRule.getTotal(DESTROYS)).isEqualTo(expectedDestroys());
+ }
+
+ @Test
+ public void afterClearIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ }
+
+ region.clear();
+
+ assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears());
+ }
+
+ protected int expectedClears() {
+ return getVMCount() + 1;
+ }
+
+ protected int expectedRegionDestroys() {
+ return getVMCount() + 1;
+ }
+
+ /**
+ * Overridden within tests to increment shared counters.
+ */
+ private abstract static class BaseCacheListener extends CacheListenerAdapter<String, Integer>
+ implements Serializable {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ fail("Unexpected listener callback: afterCreate");
+ }
+
+ @Override
+ public void afterInvalidate(final EntryEvent<String, Integer> event) {
+ fail("Unexpected listener callback: afterInvalidate");
+ }
+
+ @Override
+ public void afterDestroy(final EntryEvent<String, Integer> event) {
+ fail("Unexpected listener callback: afterDestroy");
+ }
+
+ @Override
+ public void afterUpdate(final EntryEvent<String, Integer> event) {
+ fail("Unexpected listener callback: afterUpdate");
+ }
+
+ @Override
+ public void afterRegionInvalidate(final RegionEvent<String, Integer> event) {
+ fail("Unexpected listener callback: afterRegionInvalidate");
+ }
+ }
+
+ private class CreateCountingCacheListener extends BaseCacheListener {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(CREATES);
+
+ errorCollector.checkThat(event.getDistributedMember(), equalTo(event.getCallbackArgument()));
+ errorCollector.checkThat(event.getOperation(), equalTo(Operation.CREATE));
+ errorCollector.checkThat(event.getOldValue(), nullValue());
+ errorCollector.checkThat(event.getNewValue(), equalTo(ENTRY_VALUE));
+
+ if (event.getSerializedOldValue() != null) {
+ errorCollector.checkThat(event.getSerializedOldValue().getDeserializedValue(),
+ equalTo(event.getOldValue()));
+ }
+ if (event.getSerializedNewValue() != null) {
+ errorCollector.checkThat(event.getSerializedNewValue().getDeserializedValue(),
+ equalTo(event.getNewValue()));
+ }
+ }
+ }
+
+ private class UpdateCountingCacheListener extends BaseCacheListener {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ // nothing
+ }
+
+ @Override
+ public void afterUpdate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(UPDATES);
+
+ errorCollector.checkThat(event.getDistributedMember(), equalTo(event.getCallbackArgument()));
+ errorCollector.checkThat(event.getOperation(), equalTo(Operation.UPDATE));
+ errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue()));
+ errorCollector.checkThat(event.getNewValue(), equalTo(UPDATED_ENTRY_VALUE));
+
+ if (event.getSerializedOldValue() != null) {
+ errorCollector.checkThat(event.getSerializedOldValue().getDeserializedValue(),
+ equalTo(event.getOldValue()));
+ }
+ if (event.getSerializedNewValue() != null) {
+ errorCollector.checkThat(event.getSerializedNewValue().getDeserializedValue(),
+ equalTo(event.getNewValue()));
+ }
+ }
+ }
+
+ private class InvalidateCountingCacheListener extends BaseCacheListener {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ // ignore
+ }
+
+ @Override
+ public void afterInvalidate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(INVALIDATES);
+
+ if (event.isOriginRemote()) {
+ errorCollector.checkThat(event.getDistributedMember(),
+ not(cacheRule.getSystem().getDistributedMember()));
+ } else {
+ errorCollector.checkThat(event.getDistributedMember(),
+ equalTo(cacheRule.getSystem().getDistributedMember()));
+ }
+ errorCollector.checkThat(event.getOperation(), equalTo(Operation.INVALIDATE));
+ errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue()));
+ errorCollector.checkThat(event.getNewValue(), nullValue());
+ }
+ }
+
+ private class DestroyCountingCacheListener extends BaseCacheListener {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(CREATES);
+ }
+
+ @Override
+ public void afterUpdate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(UPDATES);
+ }
+
+ @Override
+ public void afterDestroy(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(DESTROYS);
+
+ if (event.isOriginRemote()) {
+ errorCollector.checkThat(event.getDistributedMember(),
+ not(cacheRule.getSystem().getDistributedMember()));
+ } else {
+ errorCollector.checkThat(event.getDistributedMember(),
+ equalTo(cacheRule.getSystem().getDistributedMember()));
+ }
+ errorCollector.checkThat(event.getOperation(), equalTo(Operation.DESTROY));
+ errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue()));
+ errorCollector.checkThat(event.getNewValue(), nullValue());
+ }
+ }
+
+ protected class ClearCountingCacheListener extends BaseCacheListener {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(CREATES);
+ }
+
+ @Override
+ public void afterUpdate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(UPDATES);
+ }
+
+ @Override
+ public void afterRegionClear(RegionEvent<String, Integer> event) {
+
+ sharedCountersRule.increment(CLEAR);
+ if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+ if (event.isOriginRemote()) {
+ errorCollector.checkThat(event.getDistributedMember(),
+ not(cacheRule.getSystem().getDistributedMember()));
+ } else {
+ errorCollector.checkThat(event.getDistributedMember(),
+ equalTo(cacheRule.getSystem().getDistributedMember()));
+ }
+ }
+ errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_CLEAR));
+ errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName));
+ }
+ }
+
+ protected class RegionDestroyCountingCacheListener extends BaseCacheListener {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(CREATES);
+ }
+
+ @Override
+ public void afterUpdate(final EntryEvent<String, Integer> event) {
+ sharedCountersRule.increment(UPDATES);
+ }
+
+ @Override
+ public void afterRegionDestroy(final RegionEvent<String, Integer> event) {
+ sharedCountersRule.increment(REGION_DESTROY);
+
+ if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+ if (event.isOriginRemote()) {
+ errorCollector.checkThat(event.getDistributedMember(),
+ not(cacheRule.getSystem().getDistributedMember()));
+ } else {
+ errorCollector.checkThat(event.getDistributedMember(),
+ equalTo(cacheRule.getSystem().getDistributedMember()));
+ }
+ }
+ errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_DESTROY));
+ errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName));
+ }
+ }
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
index 6612833..dd229de 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
@@ -51,8 +51,8 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
private static final String UPDATES = "UPDATES";
private static final String INVALIDATES = "INVALIDATES";
private static final String DESTROYS = "DESTROYS";
- protected static final String CLEAR = "CLEAR";
- protected static final String REGION_DESTROY = "REGION_DESTROY";
+ private static final String CLEAR = "CLEAR";
+ private static final String REGION_DESTROY = "REGION_DESTROY";
private static final int ENTRY_VALUE = 0;
private static final int UPDATED_ENTRY_VALUE = 1;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 2da4973..51eabe9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -10256,7 +10256,7 @@ public class PartitionedRegion extends LocalRegion
void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
// Synchronized to avoid other threads invoking clear on this vm/node.
synchronized (clearLock) {
- partitionedRegionClear.doClear(regionEvent, cacheWrite, this);
+ partitionedRegionClear.doClear(regionEvent, cacheWrite);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 69277ef..030b36e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -39,21 +39,24 @@ public class PartitionedRegionClear {
private static final Logger logger = LogService.getLogger();
- private static final String CLEAR_OPERATION = "_clearOperation";
+ protected static final String CLEAR_OPERATION = "_clearOperation";
private final int retryTime = 2 * 60 * 1000;
private final PartitionedRegion partitionedRegion;
- private final LockForListenerAndClientNotification lockForListenerAndClientNotification =
+ protected final LockForListenerAndClientNotification lockForListenerAndClientNotification =
new LockForListenerAndClientNotification();
private volatile boolean membershipChange = false;
+ protected final PartitionedRegionClearListener partitionedRegionClearListener =
+ new PartitionedRegionClearListener();
+
public PartitionedRegionClear(PartitionedRegion partitionedRegion) {
this.partitionedRegion = partitionedRegion;
partitionedRegion.getDistributionManager()
- .addMembershipListener(new PartitionedRegionClearListener());
+ .addMembershipListener(partitionedRegionClearListener);
}
public boolean isLockedForListenerAndClientNotification() {
@@ -79,6 +82,10 @@ public class PartitionedRegionClear {
}
}
+ protected PartitionedRegionClearListener getPartitionedRegionClearListener() {
+ return partitionedRegionClearListener;
+ }
+
void obtainLockForClear(RegionEventImpl event) {
obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
sendPartitionedRegionClearMessage(event,
@@ -100,9 +107,8 @@ public class PartitionedRegionClear {
return allBucketsCleared;
}
- private void waitForPrimary() {
+ protected void waitForPrimary(PartitionedRegion.RetryTimeKeeper retryTimer) {
boolean retry;
- PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime);
do {
retry = false;
for (BucketRegion bucketRegion : partitionedRegion.getDataStore()
@@ -122,7 +128,7 @@ public class PartitionedRegionClear {
public ArrayList clearRegionLocal(RegionEventImpl regionEvent) {
ArrayList clearedBuckets = new ArrayList();
- membershipChange = false;
+ setMembershipChange(false);
// Synchronized to handle the requester departure.
synchronized (lockForListenerAndClientNotification) {
if (partitionedRegion.getDataStore() != null) {
@@ -130,18 +136,22 @@ public class PartitionedRegionClear {
try {
boolean retry;
do {
- waitForPrimary();
-
+ waitForPrimary(new PartitionedRegion.RetryTimeKeeper(retryTime));
+ RegionEventImpl bucketRegionEvent;
for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore()
.getAllLocalPrimaryBucketRegions()) {
if (localPrimaryBucketRegion.size() > 0) {
- localPrimaryBucketRegion.clear();
+ bucketRegionEvent =
+ new RegionEventImpl(localPrimaryBucketRegion, Operation.REGION_CLEAR, null,
+ false, partitionedRegion.getMyId(), regionEvent.getEventId());
+ localPrimaryBucketRegion.cmnClearRegion(bucketRegionEvent, false, true);
}
clearedBuckets.add(localPrimaryBucketRegion.getId());
}
- if (membershipChange) {
- membershipChange = false;
+ if (getMembershipChange()) {
+ // Retry and reset the membership change status.
+ setMembershipChange(false);
retry = true;
} else {
retry = false;
@@ -160,7 +170,7 @@ public class PartitionedRegionClear {
return clearedBuckets;
}
- private void doAfterClear(RegionEventImpl regionEvent) {
+ protected void doAfterClear(RegionEventImpl regionEvent) {
if (partitionedRegion.hasAnyClientsInterested()) {
notifyClients(regionEvent);
}
@@ -245,7 +255,7 @@ public class PartitionedRegionClear {
}
}
- private List sendPartitionedRegionClearMessage(RegionEventImpl event,
+ protected List sendPartitionedRegionClearMessage(RegionEventImpl event,
PartitionedRegionClearMessage.OperationType op) {
RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone();
eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR);
@@ -259,7 +269,7 @@ public class PartitionedRegionClear {
} while (true);
}
- private List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
+ protected List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
PartitionedRegionClearMessage.OperationType op)
throws ForceReattemptException {
List bucketsOperated = null;
@@ -321,30 +331,27 @@ public class PartitionedRegionClear {
return bucketsOperated;
}
- void doClear(RegionEventImpl regionEvent, boolean cacheWrite,
- PartitionedRegion partitionedRegion) {
- String lockName = CLEAR_OPERATION + partitionedRegion.getDisplayName();
+ void doClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+ String lockName = CLEAR_OPERATION + partitionedRegion.getName();
try {
// distributed lock to make sure only one clear op is in progress in the cluster.
acquireDistributedClearLock(lockName);
// Force all primary buckets to be created before clear.
- PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion);
+ assignAllPrimaryBuckets();
// do cacheWrite
- try {
- partitionedRegion.cacheWriteBeforeRegionClear(regionEvent);
- } catch (OperationAbortedException operationAbortedException) {
- throw new CacheWriterException(operationAbortedException);
+ if (cacheWrite) {
+ invokeCacheWriter(regionEvent);
}
// Check if there are any listeners or clients interested. If so, then clear write
// locks needs to be taken on all local and remote primary buckets in order to
// preserve the ordering of client events (for concurrent operations on the region).
- boolean acquireClearLockForClientNotification =
- (partitionedRegion.hasAnyClientsInterested() && partitionedRegion.hasListener());
- if (acquireClearLockForClientNotification) {
+ boolean acquireClearLockForNotification =
+ (partitionedRegion.hasAnyClientsInterested() || partitionedRegion.hasListener());
+ if (acquireClearLockForNotification) {
obtainLockForClear(regionEvent);
}
try {
@@ -362,7 +369,7 @@ public class PartitionedRegionClear {
throw new PartitionedRegionPartialClearException(message);
}
} finally {
- if (acquireClearLockForClientNotification) {
+ if (acquireClearLockForNotification) {
releaseLockForClear(regionEvent);
}
}
@@ -372,7 +379,19 @@ public class PartitionedRegionClear {
}
}
- void handleClearFromDepartedMember(InternalDistributedMember departedMember) {
+ protected void invokeCacheWriter(RegionEventImpl regionEvent) {
+ try {
+ partitionedRegion.cacheWriteBeforeRegionClear(regionEvent);
+ } catch (OperationAbortedException operationAbortedException) {
+ throw new CacheWriterException(operationAbortedException);
+ }
+ }
+
+ protected void assignAllPrimaryBuckets() {
+ PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion);
+ }
+
+ protected void handleClearFromDepartedMember(InternalDistributedMember departedMember) {
if (departedMember.equals(lockForListenerAndClientNotification.getLockRequester())) {
synchronized (lockForListenerAndClientNotification) {
if (lockForListenerAndClientNotification.getLockRequester() != null) {
@@ -407,12 +426,20 @@ public class PartitionedRegionClear {
}
}
+ protected void setMembershipChange(boolean membershipChange) {
+ this.membershipChange = membershipChange;
+ }
+
+ protected boolean getMembershipChange() {
+ return membershipChange;
+ }
+
protected class PartitionedRegionClearListener implements MembershipListener {
@Override
public synchronized void memberDeparted(DistributionManager distributionManager,
InternalDistributedMember id, boolean crashed) {
- membershipChange = true;
+ setMembershipChange(true);
handleClearFromDepartedMember(id);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
new file mode 100644
index 0000000..d8c42af
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+
+public class PartitionedRegionClearTest {
+
+
+ private PartitionedRegionClear partitionedRegionClear;
+ private DistributionManager distributionManager;
+ private PartitionedRegion partitionedRegion;
+
+ @Before
+ public void setUp() {
+
+ partitionedRegion = mock(PartitionedRegion.class);
+ distributionManager = mock(DistributionManager.class);
+
+ when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager);
+ when(partitionedRegion.getName()).thenReturn("prRegion");
+
+ partitionedRegionClear = new PartitionedRegionClear(partitionedRegion);
+ }
+
+ private Set<BucketRegion> setupBucketRegions(
+ PartitionedRegionDataStore partitionedRegionDataStore,
+ BucketAdvisor bucketAdvisor) {
+ final int numBuckets = 2;
+ Set<BucketRegion> bucketRegions = new HashSet<>();
+ for (int i = 0; i < numBuckets; i++) {
+ BucketRegion bucketRegion = mock(BucketRegion.class);
+ when(bucketRegion.getBucketAdvisor()).thenReturn(bucketAdvisor);
+ when(bucketRegion.size()).thenReturn(1);
+ when(bucketRegion.getId()).thenReturn(i);
+ bucketRegions.add(bucketRegion);
+ }
+
+ when(partitionedRegionDataStore.getAllLocalBucketRegions()).thenReturn(bucketRegions);
+ when(partitionedRegionDataStore.getAllLocalPrimaryBucketRegions()).thenReturn(bucketRegions);
+
+ return bucketRegions;
+ }
+
+ @Test
+ public void isLockedForListenerAndClientNotificationReturnsTrueWhenLocked() {
+ InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+ when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true);
+ partitionedRegionClear.obtainClearLockLocal(internalDistributedMember);
+
+ assertThat(partitionedRegionClear.isLockedForListenerAndClientNotification()).isTrue();
+ }
+
+ @Test
+ public void isLockedForListenerAndClientNotificationReturnsFalseWhenMemberNotInTheSystemRequestsLock() {
+ InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+ when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(false);
+
+ assertThat(partitionedRegionClear.isLockedForListenerAndClientNotification()).isFalse();
+ }
+
+ @Test
+ public void acquireDistributedClearLockGetsDistributedLock() {
+ DistributedLockService distributedLockService = mock(DistributedLockService.class);
+ String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName();
+ when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService);
+
+ partitionedRegionClear.acquireDistributedClearLock(lockName);
+
+ verify(distributedLockService, times(1)).lock(lockName, -1, -1);
+ }
+
+ @Test
+ public void releaseDistributedClearLockReleasesDistributedLock() {
+ DistributedLockService distributedLockService = mock(DistributedLockService.class);
+ String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName();
+ when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService);
+
+ partitionedRegionClear.releaseDistributedClearLock(lockName);
+
+ verify(distributedLockService, times(1)).unlock(lockName);
+ }
+
+ @Test
+ public void obtainLockForClearGetsLocalLockAndSendsMessageForRemote() throws Exception {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+ Region<String, PartitionRegionConfig> region = mock(Region.class);
+ when(partitionedRegion.getPRRoot()).thenReturn(region);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear)
+ .attemptToSendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+ InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+ when(distributionManager.getId()).thenReturn(internalDistributedMember);
+
+ spyPartitionedRegionClear.obtainLockForClear(regionEvent);
+
+ verify(spyPartitionedRegionClear, times(1)).obtainClearLockLocal(internalDistributedMember);
+ verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+ }
+
+ @Test
+ public void releaseLockForClearReleasesLocalLockAndSendsMessageForRemote() throws Exception {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+ Region<String, PartitionRegionConfig> region = mock(Region.class);
+ when(partitionedRegion.getPRRoot()).thenReturn(region);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear)
+ .attemptToSendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+ InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+ when(distributionManager.getId()).thenReturn(internalDistributedMember);
+
+ spyPartitionedRegionClear.releaseLockForClear(regionEvent);
+
+ verify(spyPartitionedRegionClear, times(1)).releaseClearLockLocal();
+ verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+ }
+
+ @Test
+ public void clearRegionClearsLocalAndSendsMessageForRemote() throws Exception {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+ Region<String, PartitionRegionConfig> region = mock(Region.class);
+ when(partitionedRegion.getPRRoot()).thenReturn(region);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear)
+ .attemptToSendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+ InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+ when(distributionManager.getId()).thenReturn(internalDistributedMember);
+ RegionVersionVector regionVersionVector = mock(RegionVersionVector.class);
+
+ spyPartitionedRegionClear.clearRegion(regionEvent, false, regionVersionVector);
+
+ verify(spyPartitionedRegionClear, times(1)).clearRegionLocal(regionEvent);
+ verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+ }
+
+ @Test
+ public void waitForPrimaryReturnsAfterFindingAllPrimary() {
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ when(bucketAdvisor.hasPrimary()).thenReturn(true);
+ setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class);
+
+ partitionedRegionClear.waitForPrimary(retryTimer);
+
+ verify(retryTimer, times(0)).waitForBucketsRecovery();
+ }
+
+ @Test
+ public void waitForPrimaryReturnsAfterRetryForPrimary() {
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ when(bucketAdvisor.hasPrimary()).thenReturn(false).thenReturn(true);
+ setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class);
+
+ partitionedRegionClear.waitForPrimary(retryTimer);
+
+ verify(retryTimer, times(1)).waitForBucketsRecovery();
+ }
+
+ @Test
+ public void waitForPrimaryThrowsPartitionedRegionPartialClearException() {
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class);
+ when(retryTimer.overMaximum()).thenReturn(true);
+
+ Throwable thrown = catchThrowable(() -> partitionedRegionClear.waitForPrimary(retryTimer));
+
+ assertThat(thrown)
+ .isInstanceOf(PartitionedRegionPartialClearException.class)
+ .hasMessage(
+ "Unable to find primary bucket region during clear operation for region: prRegion");
+ verify(retryTimer, times(0)).waitForBucketsRecovery();
+ }
+
+ @Test
+ public void clearRegionLocalCallsClearOnLocalPrimaryBucketRegions() {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ when(bucketAdvisor.hasPrimary()).thenReturn(true);
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ doNothing().when(partitionedRegionDataStore).lockBucketCreationForRegionClear();
+ Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+
+ List bucketsCleared = partitionedRegionClear.clearRegionLocal(regionEvent);
+
+ assertThat(bucketsCleared).hasSize(buckets.size());
+
+ ArgumentCaptor<RegionEventImpl> argument = ArgumentCaptor.forClass(RegionEventImpl.class);
+ for (BucketRegion bucketRegion : buckets) {
+ verify(bucketRegion, times(1)).cmnClearRegion(argument.capture(), eq(false), eq(true));
+ RegionEventImpl bucketRegionEvent = argument.getValue();
+ assertThat(bucketRegionEvent.getRegion()).isEqualTo(bucketRegion);
+ }
+ }
+
+ @Test
+ public void clearRegionLocalRetriesClearOnLocalPrimaryBucketRegions() {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ when(bucketAdvisor.hasPrimary()).thenReturn(true);
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ doNothing().when(partitionedRegionDataStore).lockBucketCreationForRegionClear();
+ Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ when(spyPartitionedRegionClear.getMembershipChange()).thenReturn(true).thenReturn(false);
+
+ List bucketsCleared = spyPartitionedRegionClear.clearRegionLocal(regionEvent);
+
+ int expectedClears = buckets.size() * 2; /* clear is called twice on each bucket */
+ assertThat(bucketsCleared).hasSize(expectedClears);
+
+ ArgumentCaptor<RegionEventImpl> argument = ArgumentCaptor.forClass(RegionEventImpl.class);
+ for (BucketRegion bucketRegion : buckets) {
+ verify(bucketRegion, times(2)).cmnClearRegion(argument.capture(), eq(false), eq(true));
+ RegionEventImpl bucketRegionEvent = argument.getValue();
+ assertThat(bucketRegionEvent.getRegion()).isEqualTo(bucketRegion);
+ }
+ }
+
+ @Test
+ public void doAfterClearCallsNotifyClientsWhenClientHaveInterests() {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true);
+ FilterProfile filterProfile = mock(FilterProfile.class);
+ FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
+ when(filterProfile.getFilterRoutingInfoPart1(regionEvent, FilterProfile.NO_PROFILES,
+ Collections.emptySet())).thenReturn(filterRoutingInfo);
+ when(filterProfile.getFilterRoutingInfoPart2(filterRoutingInfo, regionEvent)).thenReturn(
+ filterRoutingInfo);
+ when(partitionedRegion.getFilterProfile()).thenReturn(filterProfile);
+
+ partitionedRegionClear.doAfterClear(regionEvent);
+
+ verify(regionEvent, times(1)).setLocalFilterInfo(any());
+ verify(partitionedRegion, times(1)).notifyBridgeClients(regionEvent);
+ }
+
+ @Test
+ public void doAfterClearDispatchesListenerEvents() {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasListener()).thenReturn(true);
+
+ partitionedRegionClear.doAfterClear(regionEvent);
+
+ verify(partitionedRegion, times(1)).dispatchListenerEvent(
+ EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
+ }
+
+ @Test
+ public void obtainClearLockLocalGetsLockOnPrimaryBuckets() {
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ when(bucketAdvisor.hasPrimary()).thenReturn(true);
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ when(distributionManager.isCurrentMember(member)).thenReturn(true);
+
+ partitionedRegionClear.obtainClearLockLocal(member);
+
+ assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+ .isSameAs(member);
+ for (BucketRegion bucketRegion : buckets) {
+ verify(bucketRegion, times(1)).lockLocallyForClear(partitionedRegion.getDistributionManager(),
+ partitionedRegion.getMyId(), null);
+ }
+ }
+
+ @Test
+ public void obtainClearLockLocalDoesNotGetLocksOnPrimaryBucketsWhenMemberIsNotCurrent() {
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ when(bucketAdvisor.hasPrimary()).thenReturn(true);
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ when(distributionManager.isCurrentMember(member)).thenReturn(false);
+
+ partitionedRegionClear.obtainClearLockLocal(member);
+
+ assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+ .isNull();
+ for (BucketRegion bucketRegion : buckets) {
+ verify(bucketRegion, times(0)).lockLocallyForClear(partitionedRegion.getDistributionManager(),
+ partitionedRegion.getMyId(), null);
+ }
+ }
+
+ @Test
+ public void releaseClearLockLocalReleasesLockOnPrimaryBuckets() {
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ when(bucketAdvisor.hasPrimary()).thenReturn(true);
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ when(distributionManager.isCurrentMember(member)).thenReturn(true);
+ partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member);
+
+ partitionedRegionClear.releaseClearLockLocal();
+
+ for (BucketRegion bucketRegion : buckets) {
+ verify(bucketRegion, times(1)).releaseLockLocallyForClear(null);
+ }
+ }
+
+ @Test
+ public void releaseClearLockLocalDoesNotReleaseLocksOnPrimaryBucketsWhenMemberIsNotCurrent() {
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ when(bucketAdvisor.hasPrimary()).thenReturn(true);
+ PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+ Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+ when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+
+ partitionedRegionClear.releaseClearLockLocal();
+
+ assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+ .isNull();
+ for (BucketRegion bucketRegion : buckets) {
+ verify(bucketRegion, times(0)).releaseLockLocallyForClear(null);
+ }
+ }
+
+ @Test
+ public void sendPartitionedRegionClearMessageSendsClearMessageToPRNodes() {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+ Region<String, PartitionRegionConfig> prRoot = mock(Region.class);
+ when(partitionedRegion.getPRRoot()).thenReturn(prRoot);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+ Set<InternalDistributedMember> prNodes = Collections.singleton(member);
+ Node node = mock(Node.class);
+ when(node.getMemberId()).thenReturn(member);
+ Set<Node> configNodes = Collections.singleton(node);
+ when(regionAdvisor.adviseAllPRNodes()).thenReturn(prNodes);
+ when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+ PartitionRegionConfig partitionRegionConfig = mock(PartitionRegionConfig.class);
+ when(partitionRegionConfig.getNodes()).thenReturn(configNodes);
+ when(prRoot.get(any())).thenReturn(partitionRegionConfig);
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ when(internalDistributedSystem.getDistributionManager()).thenReturn(distributionManager);
+ when(partitionedRegion.getSystem()).thenReturn(internalDistributedSystem);
+ InternalCache internalCache = mock(InternalCache.class);
+ TXManagerImpl txManager = mock(TXManagerImpl.class);
+ when(txManager.isDistributed()).thenReturn(false);
+ when(internalCache.getTxManager()).thenReturn(txManager);
+ when(partitionedRegion.getCache()).thenReturn(internalCache);
+
+ when(distributionManager.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+ when(distributionManager.getStats()).thenReturn(mock(DMStats.class));
+
+ partitionedRegionClear.sendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+
+ verify(distributionManager, times(1)).putOutgoing(any());
+ }
+
+ @Test
+ public void doClearAcquiresAndReleasesDistributedClearLockAndCreatesAllPrimaryBuckets() {
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+ doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, false,
+ null);
+
+ spyPartitionedRegionClear.doClear(regionEvent, false);
+
+ verify(spyPartitionedRegionClear, times(1)).acquireDistributedClearLock(any());
+ verify(spyPartitionedRegionClear, times(1)).releaseDistributedClearLock(any());
+ verify(spyPartitionedRegionClear, times(1)).assignAllPrimaryBuckets();
+ }
+
+ @Test
+ public void doClearInvokesCacheWriterWhenCacheWriteIsSet() {
+ boolean cacheWrite = true;
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+ doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+ cacheWrite, null);
+
+ spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+ verify(spyPartitionedRegionClear, times(1)).invokeCacheWriter(regionEvent);
+ }
+
+ @Test
+ public void doClearDoesNotInvokesCacheWriterWhenCacheWriteIsNotSet() {
+ boolean cacheWrite = false;
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+ doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+ cacheWrite, null);
+
+ spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+ verify(spyPartitionedRegionClear, times(0)).invokeCacheWriter(regionEvent);
+ }
+
+ @Test
+ public void doClearObtainsAndReleasesLockForClearWhenRegionHasListener() {
+ boolean cacheWrite = false;
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasListener()).thenReturn(true);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+ doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+ doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+ doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+ cacheWrite, null);
+
+ spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+ verify(spyPartitionedRegionClear, times(1)).obtainLockForClear(regionEvent);
+ verify(spyPartitionedRegionClear, times(1)).releaseLockForClear(regionEvent);
+ }
+
+ @Test
+ public void doClearObtainsAndReleasesLockForClearWhenRegionHasClientInterest() {
+ boolean cacheWrite = false;
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasListener()).thenReturn(false);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+ doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+ doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+ doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+ cacheWrite, null);
+
+ spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+ verify(spyPartitionedRegionClear, times(1)).obtainLockForClear(regionEvent);
+ verify(spyPartitionedRegionClear, times(1)).releaseLockForClear(regionEvent);
+ }
+
+ @Test
+ public void doClearDoesNotObtainLockForClearWhenRegionHasNoListenerAndNoClientInterest() {
+ boolean cacheWrite = false;
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasListener()).thenReturn(false);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+ doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+ doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+ doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+ cacheWrite, null);
+
+ spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+ verify(spyPartitionedRegionClear, times(0)).obtainLockForClear(regionEvent);
+ verify(spyPartitionedRegionClear, times(0)).releaseLockForClear(regionEvent);
+ }
+
+ @Test
+ public void doClearThrowsPartitionedRegionPartialClearException() {
+ boolean cacheWrite = false;
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasListener()).thenReturn(false);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+ when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(1);
+ when(partitionedRegion.getName()).thenReturn("prRegion");
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+ doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+ doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+ doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+ doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+ cacheWrite, null);
+
+ Throwable thrown =
+ catchThrowable(() -> spyPartitionedRegionClear.doClear(regionEvent, cacheWrite));
+
+ assertThat(thrown)
+ .isInstanceOf(PartitionedRegionPartialClearException.class)
+ .hasMessage(
+ "Unable to clear all the buckets from the partitioned region prRegion, either data (buckets) moved or member departed.");
+ }
+
+ @Test
+ public void handleClearFromDepartedMemberReleasesTheLockForRequesterDeparture() {
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+
+ spyPartitionedRegionClear.handleClearFromDepartedMember(member);
+
+ verify(spyPartitionedRegionClear, times(1)).releaseClearLockLocal();
+ }
+
+ @Test
+ public void handleClearFromDepartedMemberDoesNotReleasesTheLockForNonRequesterDeparture() {
+ InternalDistributedMember requesterMember = mock(InternalDistributedMember.class);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ partitionedRegionClear.lockForListenerAndClientNotification.setLocked(requesterMember);
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+
+ spyPartitionedRegionClear.handleClearFromDepartedMember(member);
+
+ verify(spyPartitionedRegionClear, times(0)).releaseClearLockLocal();
+ }
+
+ @Test
+ public void partitionedRegionClearRegistersMembershipListener() {
+ MembershipListener membershipListener =
+ partitionedRegionClear.getPartitionedRegionClearListener();
+
+ verify(distributionManager, times(1)).addMembershipListener(membershipListener);
+ }
+
+ @Test
+ public void lockRequesterDepartureReleasesTheLock() {
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member);
+ PartitionedRegionClear.PartitionedRegionClearListener partitionedRegionClearListener =
+ partitionedRegionClear.getPartitionedRegionClearListener();
+
+ partitionedRegionClearListener.memberDeparted(distributionManager, member, true);
+
+ assertThat(partitionedRegionClear.getMembershipChange()).isTrue();
+ assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+ .isNull();
+ }
+
+ @Test
+ public void nonLockRequesterDepartureDoesNotReleasesTheLock() {
+ InternalDistributedMember requesterMember = mock(InternalDistributedMember.class);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ partitionedRegionClear.lockForListenerAndClientNotification.setLocked(requesterMember);
+ PartitionedRegionClear.PartitionedRegionClearListener partitionedRegionClearListener =
+ partitionedRegionClear.getPartitionedRegionClearListener();
+
+ partitionedRegionClearListener.memberDeparted(distributionManager, member, true);
+
+ assertThat(partitionedRegionClear.getMembershipChange()).isTrue();
+ assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+ .isNotNull();
+ }
+}