You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/02 19:29:00 UTC
[geode] 14/19: GEODE-8334: PR.clear should sync with putAll or
removeAll on rvvLock (#5365)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit eaa1322c769bb236c46330cddce6d06daaaa5f83
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Fri Jul 10 08:59:26 2020 -0700
GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock (#5365)
Co-authored-by: Xiaojian Zhou <zh...@vmware.com>
Co-authored-by: Anil Gingade <ag...@vmware.com>
---
.../apache/geode/internal/cache/BucketRegion.java | 8 +-----
.../cache/partitioned/PutAllPRMessage.java | 9 +++++++
.../cache/partitioned/RemoveAllPRMessage.java | 9 +++++++
.../cache/partitioned/PutAllPRMessageTest.java | 29 ++++++++++++++++++++++
.../cache/partitioned/RemoveAllPRMessageTest.java | 29 ++++++++++++++++++++++
5 files changed, 77 insertions(+), 7 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 3329e42..454db5c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -569,12 +569,6 @@ public class BucketRegion extends DistributedRegion implements Bucket {
return;
}
- boolean enableRVV = useRVV && getConcurrencyChecksEnabled();
- RegionVersionVector rvv = null;
- if (enableRVV) {
- rvv = getVersionVector().getCloneForTransmission();
- }
-
// get rvvLock
Set<InternalDistributedMember> participants =
getCacheDistributionAdvisor().adviseInvalidateRegion();
@@ -588,7 +582,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// no need to dominate my own rvv.
// Clear is on going here, there won't be GII for this member
clearRegionLocally(regionEvent, cacheWrite, null);
- distributeClearOperation(regionEvent, rvv, participants);
+ distributeClearOperation(regionEvent, null, participants);
// TODO: call reindexUserDataRegion if there're lucene indexes
} finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index 0c690c5..6bb666c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -413,6 +413,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
Object[] keys = getKeysToBeLocked();
if (!notificationOnly) {
boolean locked = false;
+ boolean rvvLocked = false;
try {
if (putAllPRData.length > 0) {
if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
@@ -438,6 +439,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
bucketRegion.recordBulkOpStart(membershipID, eventID);
}
locked = bucketRegion.waitUntilLocked(keys);
+ if (!rvvLocked) {
+ bucketRegion.lockRVVForBulkOp();
+ rvvLocked = true;
+ }
boolean lockedForPrimary = false;
final HashMap succeeded = new HashMap();
PutAllPartialResult partialKeys = new PutAllPartialResult(putAllPRDataSize);
@@ -518,6 +523,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
} catch (RegionDestroyedException e) {
ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
} finally {
+ if (rvvLocked) {
+ bucketRegion.unlockRVVForBulkOp();
+ rvvLocked = false;
+ }
if (locked) {
bucketRegion.removeAndNotifyKeys(keys);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 6f355d6..f295136 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -406,6 +406,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
if (!notificationOnly) {
boolean locked = false;
+ boolean rvvLocked = false;
try {
if (removeAllPRData.length > 0) {
if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
@@ -431,6 +432,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
bucketRegion.recordBulkOpStart(membershipID, eventID);
}
locked = bucketRegion.waitUntilLocked(keys);
+ if (!rvvLocked) {
+ bucketRegion.lockRVVForBulkOp();
+ rvvLocked = true;
+ }
boolean lockedForPrimary = false;
final ArrayList<Object> succeeded = new ArrayList<Object>();
PutAllPartialResult partialKeys = new PutAllPartialResult(removeAllPRDataSize);
@@ -526,6 +531,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
} catch (RegionDestroyedException e) {
ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
} finally {
+ if (rvvLocked) {
+ bucketRegion.unlockRVVForBulkOp();
+ rvvLocked = false;
+ }
if (locked) {
bucketRegion.removeAndNotifyKeys(keys);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
index ab82a93..f5480a5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
@@ -15,9 +15,11 @@
package org.apache.geode.internal.cache.partitioned;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -119,4 +121,31 @@ public class PutAllPRMessageTest {
eq(regionDestroyedException));
}
+ @Test
+ public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception {
+ PutAllPRMessage message = spy(new PutAllPRMessage(bucketId, 1, false, false, false, null));
+ message.addEntry(entryData);
+ doReturn(keys).when(message).getKeysToBeLocked();
+ when(bucketRegion.waitUntilLocked(keys)).thenReturn(true);
+ when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException());
+ doNothing().when(bucketRegion).lockRVVForBulkOp();
+ doNothing().when(bucketRegion).unlockRVVForBulkOp();
+
+ InternalCache cache = mock(InternalCache.class);
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+ when(bucketRegion.getCache()).thenReturn(cache);
+ when(cache.getDistributedSystem()).thenReturn(ids);
+ when(ids.getOffHeapStore()).thenReturn(null);
+
+ assertThatThrownBy(
+ () -> message.doLocalPutAll(partitionedRegion, mock(InternalDistributedMember.class), 1))
+ .isInstanceOf(PrimaryBucketException.class);
+
+ InOrder inOrder = inOrder(bucketRegion);
+ inOrder.verify(bucketRegion).waitUntilLocked(keys);
+ inOrder.verify(bucketRegion).lockRVVForBulkOp();
+ inOrder.verify(bucketRegion).unlockRVVForBulkOp();
+ inOrder.verify(bucketRegion).removeAndNotifyKeys(keys);
+ }
+
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
index 2309cb0..a3ee31b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
@@ -14,9 +14,11 @@
*/
package org.apache.geode.internal.cache.partitioned;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -131,4 +133,31 @@ public class RemoveAllPRMessageTest {
verify(dataStore).checkRegionDestroyedOnBucket(eq(bucketRegion), eq(true),
eq(regionDestroyedException));
}
+
+ @Test
+ public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception {
+ RemoveAllPRMessage message =
+ spy(new RemoveAllPRMessage(bucketId, 1, false, false, false, null));
+ message.addEntry(entryData);
+ doReturn(keys).when(message).getKeysToBeLocked();
+ when(bucketRegion.waitUntilLocked(keys)).thenReturn(true);
+ when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException());
+ doNothing().when(bucketRegion).lockRVVForBulkOp();
+ doNothing().when(bucketRegion).unlockRVVForBulkOp();
+
+ InternalCache cache = mock(InternalCache.class);
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+ when(bucketRegion.getCache()).thenReturn(cache);
+ when(cache.getDistributedSystem()).thenReturn(ids);
+ when(ids.getOffHeapStore()).thenReturn(null);
+
+ assertThatThrownBy(() -> message.doLocalRemoveAll(partitionedRegion,
+ mock(InternalDistributedMember.class), true)).isInstanceOf(PrimaryBucketException.class);
+
+ InOrder inOrder = inOrder(bucketRegion);
+ inOrder.verify(bucketRegion).waitUntilLocked(keys);
+ inOrder.verify(bucketRegion).lockRVVForBulkOp();
+ inOrder.verify(bucketRegion).unlockRVVForBulkOp();
+ inOrder.verify(bucketRegion).removeAndNotifyKeys(keys);
+ }
}