You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/02 19:29:00 UTC

[geode] 14/19: GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock (#5365)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit eaa1322c769bb236c46330cddce6d06daaaa5f83
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Fri Jul 10 08:59:26 2020 -0700

    GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock (#5365)
    
    
        Co-authored-by: Xiaojian Zhou <zh...@vmware.com>
        Co-authored-by: Anil Gingade <ag...@vmware.com>
---
 .../apache/geode/internal/cache/BucketRegion.java  |  8 +-----
 .../cache/partitioned/PutAllPRMessage.java         |  9 +++++++
 .../cache/partitioned/RemoveAllPRMessage.java      |  9 +++++++
 .../cache/partitioned/PutAllPRMessageTest.java     | 29 ++++++++++++++++++++++
 .../cache/partitioned/RemoveAllPRMessageTest.java  | 29 ++++++++++++++++++++++
 5 files changed, 77 insertions(+), 7 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 3329e42..454db5c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -569,12 +569,6 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       return;
     }
 
-    boolean enableRVV = useRVV && getConcurrencyChecksEnabled();
-    RegionVersionVector rvv = null;
-    if (enableRVV) {
-      rvv = getVersionVector().getCloneForTransmission();
-    }
-
     // get rvvLock
     Set<InternalDistributedMember> participants =
         getCacheDistributionAdvisor().adviseInvalidateRegion();
@@ -588,7 +582,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       // no need to dominate my own rvv.
       // Clear is on going here, there won't be GII for this member
       clearRegionLocally(regionEvent, cacheWrite, null);
-      distributeClearOperation(regionEvent, rvv, participants);
+      distributeClearOperation(regionEvent, null, participants);
 
       // TODO: call reindexUserDataRegion if there're lucene indexes
     } finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index 0c690c5..6bb666c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -413,6 +413,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
       Object[] keys = getKeysToBeLocked();
       if (!notificationOnly) {
         boolean locked = false;
+        boolean rvvLocked = false;
         try {
           if (putAllPRData.length > 0) {
             if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
@@ -438,6 +439,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
             bucketRegion.recordBulkOpStart(membershipID, eventID);
           }
           locked = bucketRegion.waitUntilLocked(keys);
+          if (!rvvLocked) {
+            bucketRegion.lockRVVForBulkOp();
+            rvvLocked = true;
+          }
           boolean lockedForPrimary = false;
           final HashMap succeeded = new HashMap();
           PutAllPartialResult partialKeys = new PutAllPartialResult(putAllPRDataSize);
@@ -518,6 +523,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
         } catch (RegionDestroyedException e) {
           ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
         } finally {
+          if (rvvLocked) {
+            bucketRegion.unlockRVVForBulkOp();
+            rvvLocked = false;
+          }
           if (locked) {
             bucketRegion.removeAndNotifyKeys(keys);
           }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 6f355d6..f295136 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -406,6 +406,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
 
       if (!notificationOnly) {
         boolean locked = false;
+        boolean rvvLocked = false;
         try {
           if (removeAllPRData.length > 0) {
             if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
@@ -431,6 +432,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
             bucketRegion.recordBulkOpStart(membershipID, eventID);
           }
           locked = bucketRegion.waitUntilLocked(keys);
+          if (!rvvLocked) {
+            bucketRegion.lockRVVForBulkOp();
+            rvvLocked = true;
+          }
           boolean lockedForPrimary = false;
           final ArrayList<Object> succeeded = new ArrayList<Object>();
           PutAllPartialResult partialKeys = new PutAllPartialResult(removeAllPRDataSize);
@@ -526,6 +531,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
         } catch (RegionDestroyedException e) {
           ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
         } finally {
+          if (rvvLocked) {
+            bucketRegion.unlockRVVForBulkOp();
+            rvvLocked = false;
+          }
           if (locked) {
             bucketRegion.removeAndNotifyKeys(keys);
           }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
index ab82a93..f5480a5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
@@ -15,9 +15,11 @@
 package org.apache.geode.internal.cache.partitioned;
 
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -119,4 +121,31 @@ public class PutAllPRMessageTest {
         eq(regionDestroyedException));
   }
 
+  @Test
+  public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception {
+    PutAllPRMessage message = spy(new PutAllPRMessage(bucketId, 1, false, false, false, null));
+    message.addEntry(entryData);
+    doReturn(keys).when(message).getKeysToBeLocked();
+    when(bucketRegion.waitUntilLocked(keys)).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException());
+    doNothing().when(bucketRegion).lockRVVForBulkOp();
+    doNothing().when(bucketRegion).unlockRVVForBulkOp();
+
+    InternalCache cache = mock(InternalCache.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    when(bucketRegion.getCache()).thenReturn(cache);
+    when(cache.getDistributedSystem()).thenReturn(ids);
+    when(ids.getOffHeapStore()).thenReturn(null);
+
+    assertThatThrownBy(
+        () -> message.doLocalPutAll(partitionedRegion, mock(InternalDistributedMember.class), 1))
+            .isInstanceOf(PrimaryBucketException.class);
+
+    InOrder inOrder = inOrder(bucketRegion);
+    inOrder.verify(bucketRegion).waitUntilLocked(keys);
+    inOrder.verify(bucketRegion).lockRVVForBulkOp();
+    inOrder.verify(bucketRegion).unlockRVVForBulkOp();
+    inOrder.verify(bucketRegion).removeAndNotifyKeys(keys);
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
index 2309cb0..a3ee31b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
@@ -14,9 +14,11 @@
  */
 package org.apache.geode.internal.cache.partitioned;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -131,4 +133,31 @@ public class RemoveAllPRMessageTest {
     verify(dataStore).checkRegionDestroyedOnBucket(eq(bucketRegion), eq(true),
         eq(regionDestroyedException));
   }
+
+  @Test
+  public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception {
+    RemoveAllPRMessage message =
+        spy(new RemoveAllPRMessage(bucketId, 1, false, false, false, null));
+    message.addEntry(entryData);
+    doReturn(keys).when(message).getKeysToBeLocked();
+    when(bucketRegion.waitUntilLocked(keys)).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException());
+    doNothing().when(bucketRegion).lockRVVForBulkOp();
+    doNothing().when(bucketRegion).unlockRVVForBulkOp();
+
+    InternalCache cache = mock(InternalCache.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    when(bucketRegion.getCache()).thenReturn(cache);
+    when(cache.getDistributedSystem()).thenReturn(ids);
+    when(ids.getOffHeapStore()).thenReturn(null);
+
+    assertThatThrownBy(() -> message.doLocalRemoveAll(partitionedRegion,
+        mock(InternalDistributedMember.class), true)).isInstanceOf(PrimaryBucketException.class);
+
+    InOrder inOrder = inOrder(bucketRegion);
+    inOrder.verify(bucketRegion).waitUntilLocked(keys);
+    inOrder.verify(bucketRegion).lockRVVForBulkOp();
+    inOrder.verify(bucketRegion).unlockRVVForBulkOp();
+    inOrder.verify(bucketRegion).removeAndNotifyKeys(keys);
+  }
 }