You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/07/09 22:44:13 UTC

[geode] branch feature/GEODE-8334 created (now 4e67053)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-8334
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 4e67053  GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock

This branch includes the following new commits:

     new 4e67053  GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-8334
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4e6705335950d30fcc1e5831ca2d09612e47b78f
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Thu Jul 9 15:41:01 2020 -0700

    GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock
---
 .../apache/geode/internal/cache/BucketRegion.java  |  8 +----
 .../cache/partitioned/PutAllPRMessage.java         |  9 ++++++
 .../cache/partitioned/RemoveAllPRMessage.java      |  9 ++++++
 .../cache/partitioned/PutAllPRMessageTest.java     | 33 +++++++++++++++++++++
 .../cache/partitioned/RemoveAllPRMessageTest.java  | 34 ++++++++++++++++++++++
 5 files changed, 86 insertions(+), 7 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index c37e1a3..6793ecb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -566,12 +566,6 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       return;
     }
 
-    boolean enableRVV = useRVV && getConcurrencyChecksEnabled();
-    RegionVersionVector rvv = null;
-    if (enableRVV) {
-      rvv = getVersionVector().getCloneForTransmission();
-    }
-
     // get rvvLock
     Set<InternalDistributedMember> participants =
         getCacheDistributionAdvisor().adviseInvalidateRegion();
@@ -585,7 +579,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       // no need to dominate my own rvv.
       // Clear is on going here, there won't be GII for this member
       clearRegionLocally(regionEvent, cacheWrite, null);
-      distributeClearOperation(regionEvent, rvv, participants);
+      distributeClearOperation(regionEvent, null, participants);
 
       // TODO: call reindexUserDataRegion if there're lucene indexes
     } finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index 0c690c5..6bb666c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -413,6 +413,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
       Object[] keys = getKeysToBeLocked();
       if (!notificationOnly) {
         boolean locked = false;
+        boolean rvvLocked = false;
         try {
           if (putAllPRData.length > 0) {
             if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
@@ -438,6 +439,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
             bucketRegion.recordBulkOpStart(membershipID, eventID);
           }
           locked = bucketRegion.waitUntilLocked(keys);
+          if (!rvvLocked) {
+            bucketRegion.lockRVVForBulkOp();
+            rvvLocked = true;
+          }
           boolean lockedForPrimary = false;
           final HashMap succeeded = new HashMap();
           PutAllPartialResult partialKeys = new PutAllPartialResult(putAllPRDataSize);
@@ -518,6 +523,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
         } catch (RegionDestroyedException e) {
           ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
         } finally {
+          if (rvvLocked) {
+            bucketRegion.unlockRVVForBulkOp();
+            rvvLocked = false;
+          }
           if (locked) {
             bucketRegion.removeAndNotifyKeys(keys);
           }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 6f355d6..f295136 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -406,6 +406,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
 
       if (!notificationOnly) {
         boolean locked = false;
+        boolean rvvLocked = false;
         try {
           if (removeAllPRData.length > 0) {
             if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
@@ -431,6 +432,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
             bucketRegion.recordBulkOpStart(membershipID, eventID);
           }
           locked = bucketRegion.waitUntilLocked(keys);
+          if (!rvvLocked) {
+            bucketRegion.lockRVVForBulkOp();
+            rvvLocked = true;
+          }
           boolean lockedForPrimary = false;
           final ArrayList<Object> succeeded = new ArrayList<Object>();
           PutAllPartialResult partialKeys = new PutAllPartialResult(removeAllPRDataSize);
@@ -526,6 +531,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
         } catch (RegionDestroyedException e) {
           ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
         } finally {
+          if (rvvLocked) {
+            bucketRegion.unlockRVVForBulkOp();
+            rvvLocked = false;
+          }
           if (locked) {
             bucketRegion.removeAndNotifyKeys(keys);
           }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
index ab82a93..c5dd140 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
@@ -15,9 +15,12 @@
 package org.apache.geode.internal.cache.partitioned;
 
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -119,4 +122,34 @@ public class PutAllPRMessageTest {
         eq(regionDestroyedException));
   }
 
+  @Test
+  public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception {
+    PutAllPRMessage message = spy(new PutAllPRMessage(bucketId, 1, false, false, false, null));
+    message.addEntry(entryData);
+    doReturn(keys).when(message).getKeysToBeLocked();
+    when(bucketRegion.waitUntilLocked(keys)).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException());
+    doNothing().when(bucketRegion).lockRVVForBulkOp();
+    doNothing().when(bucketRegion).unlockRVVForBulkOp();
+
+    InternalCache cache = mock(InternalCache.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    when(bucketRegion.getCache()).thenReturn(cache);
+    when(cache.getDistributedSystem()).thenReturn(ids);
+    when(ids.getOffHeapStore()).thenReturn(null);
+
+    try {
+      message.doLocalPutAll(partitionedRegion, mock(InternalDistributedMember.class), 1);
+      fail("Expect PrimaryBucketException");
+    } catch (Exception e) {
+      assertThat(e instanceof PrimaryBucketException);
+    }
+
+    InOrder inOrder = inOrder(bucketRegion);
+    inOrder.verify(bucketRegion).waitUntilLocked(keys);
+    inOrder.verify(bucketRegion).lockRVVForBulkOp();
+    inOrder.verify(bucketRegion).unlockRVVForBulkOp();
+    inOrder.verify(bucketRegion).removeAndNotifyKeys(keys);
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
index 2309cb0..19d508e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
@@ -14,9 +14,12 @@
  */
 package org.apache.geode.internal.cache.partitioned;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -131,4 +134,35 @@ public class RemoveAllPRMessageTest {
     verify(dataStore).checkRegionDestroyedOnBucket(eq(bucketRegion), eq(true),
         eq(regionDestroyedException));
   }
+
+  @Test
+  public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception {
+    RemoveAllPRMessage message =
+        spy(new RemoveAllPRMessage(bucketId, 1, false, false, false, null));
+    message.addEntry(entryData);
+    doReturn(keys).when(message).getKeysToBeLocked();
+    when(bucketRegion.waitUntilLocked(keys)).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException());
+    doNothing().when(bucketRegion).lockRVVForBulkOp();
+    doNothing().when(bucketRegion).unlockRVVForBulkOp();
+
+    InternalCache cache = mock(InternalCache.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    when(bucketRegion.getCache()).thenReturn(cache);
+    when(cache.getDistributedSystem()).thenReturn(ids);
+    when(ids.getOffHeapStore()).thenReturn(null);
+
+    try {
+      message.doLocalRemoveAll(partitionedRegion, mock(InternalDistributedMember.class), true);
+      fail("Expect PrimaryBucketException");
+    } catch (Exception e) {
+      assertThat(e instanceof PrimaryBucketException);
+    }
+
+    InOrder inOrder = inOrder(bucketRegion);
+    inOrder.verify(bucketRegion).waitUntilLocked(keys);
+    inOrder.verify(bucketRegion).lockRVVForBulkOp();
+    inOrder.verify(bucketRegion).unlockRVVForBulkOp();
+    inOrder.verify(bucketRegion).removeAndNotifyKeys(keys);
+  }
 }