You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2022/04/27 17:58:13 UTC

[geode] 01/01: GEODE-10242: Do not release primary lock prematurely

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-10242
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d6f4e1f61aad5bcaac8d27b100a80ea645f53bf4
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Wed Apr 27 10:45:51 2022 -0700

    GEODE-10242: Do not release primary lock prematurely
    
     * When depose primary during rebalance, do not release the primary lock
       before all colocated child buckets has deposed primary. This is to
       ensure that the node becomes new primary can only acquire the primary
       lock afterwards.
     * All colocated buckets now share the same primaryMoveReadWriteLock.
       When parent bucket is being moved, no operations will be executed on
       child buckets as well. So moving primary for all colocated buckets
       shold be faster, and there is no need to hold parent locks anymore.
---
 .../apache/geode/internal/cache/BucketAdvisor.java | 42 ++++++------
 .../apache/geode/internal/cache/BucketRegion.java  | 39 ++---------
 .../geode/internal/cache/BucketAdvisorTest.java    | 80 +++++++++++++++++-----
 3 files changed, 93 insertions(+), 68 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 2b70f868d2..ea012690ba 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -43,6 +43,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.TestOnly;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
@@ -142,9 +143,9 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
    * A read/write lock to prevent making this bucket not primary while a write is in progress on the
    * bucket.
    */
-  private final ReadWriteLock primaryMoveReadWriteLock = new ReentrantReadWriteLock();
-  private final Lock primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
-  private final Lock primaryMoveWriteLock = primaryMoveReadWriteLock.writeLock();
+  private final ReadWriteLock primaryMoveReadWriteLock;
+  private final Lock primaryMoveReadLock;
+  private final Lock primaryMoveWriteLock;
 
   /**
    * The advisor for the bucket region that we are colocated with, if this region is a colocated
@@ -181,6 +182,14 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
     redundancyTracker =
         new BucketRedundancyTracker(pRegion.getRedundantCopies(), pRegion.getRedundancyTracker());
     resetParentAdvisor(bucket.getId());
+
+    if (parentAdvisor == null) {
+      primaryMoveReadWriteLock = new ReentrantReadWriteLock();
+    } else {
+      primaryMoveReadWriteLock = parentAdvisor.primaryMoveReadWriteLock;
+    }
+    primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
+    primaryMoveWriteLock = primaryMoveReadWriteLock.writeLock();
   }
 
   public static BucketAdvisor createBucketAdvisor(Bucket bucket, RegionAdvisor regionAdvisor) {
@@ -240,19 +249,6 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
     return primaryMoveReadLock;
   }
 
-  /**
-   * Returns the lock that prevents the parent's primary from moving while active writes are in
-   * progress. This should be locked before checking if the local bucket is primary.
-   *
-   * @return the lock for in-progress write operations
-   */
-  Lock getParentPrimaryMoveReadLock() {
-    if (parentAdvisor != null) {
-      return parentAdvisor.getPrimaryMoveReadLock();
-    }
-    return null;
-  }
-
   /**
    * Try to lock the primary bucket to make sure no operation is on-going at current bucket.
    *
@@ -309,7 +305,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
    * Caller must synchronize on this BucketAdvisor.
    *
    */
-  private void deposePrimaryForColocatedChildren() {
+  void deposePrimaryForColocatedChildren() {
     boolean deposedChildPrimaries = true;
     List<PartitionedRegion> colocatedChildPRs = ColocationHelper.getColocatedChildRegions(pRegion);
     if (colocatedChildPRs != null) {
@@ -845,7 +841,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
    *
    * @param member the member who is not primary
    */
-  private void removePrimary(InternalDistributedMember member) {
+  void removePrimary(InternalDistributedMember member) {
     boolean needToVolunteerForPrimary = false;
     if (!isClosed()) { // hole: requestPrimaryState not hosting
       initializationGate();
@@ -896,9 +892,10 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
           ((BucketRegion) br).beforeReleasingPrimaryLockDuringDemotion();
         }
 
-        releasePrimaryLock();
         // this was a deposePrimary call so we need to depose children as well
         deposePrimaryForColocatedChildren();
+        releasePrimaryLock();
+
         if (pRegion.isFixedPartitionedRegion()) {
           deposeOtherPrimaryBucketForFixedPartition();
         }
@@ -1688,6 +1685,11 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
     notifyAll(); // wake up any threads in waitForPrimaryMember
   }
 
+  @TestOnly
+  void setPrimaryMemberForTest(InternalDistributedMember member) {
+    primaryMember.set(member);
+  }
+
   void setHadPrimary() {
     everHadPrimary = true;
   }
@@ -1809,7 +1811,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
   /**
    * Releases the primary lock for this bucket.
    */
-  private void releasePrimaryLock() {
+  void releasePrimaryLock() {
     // We don't have a lock if we have a parent advisor
     if (parentAdvisor != null) {
       return;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 421a854498..ffb1109115 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -799,7 +799,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   /**
-   * lock this bucket and, if present, its colocated "parent"
+   * lock this bucket
    *
    * @param tryLock - whether to use tryLock (true) or a blocking lock (false)
    * @return true if locks were obtained and are still held
@@ -832,41 +832,20 @@ public class BucketRegion extends DistributedRegion implements Bucket {
 
   private boolean lockPrimaryStateReadLock(boolean tryLock) {
     Lock primaryMoveReadLock = getBucketAdvisor().getPrimaryMoveReadLock();
-    Lock parentLock = getBucketAdvisor().getParentPrimaryMoveReadLock();
     for (;;) {
       boolean interrupted = Thread.interrupted();
       try {
         // Get the lock. If we have to wait here, it's because
         // this VM is actively becoming "not primary". We don't want
         // to throw an exception until this VM is actually no longer
-        // primary, so we wait here for not primary to complete. See bug #39963
-        if (parentLock != null) {
-          if (tryLock) {
-            boolean locked = parentLock.tryLock();
-            if (!locked) {
-              return false;
-            }
-          } else {
-            parentLock.lockInterruptibly();
-          }
-          if (tryLock) {
-            boolean locked = primaryMoveReadLock.tryLock();
-            if (!locked) {
-              parentLock.unlock();
-              return false;
-            }
-          } else {
-            primaryMoveReadLock.lockInterruptibly();
+        // primary, so we wait here for not primary to complete.
+        if (tryLock) {
+          boolean locked = primaryMoveReadLock.tryLock();
+          if (!locked) {
+            return false;
           }
         } else {
-          if (tryLock) {
-            boolean locked = primaryMoveReadLock.tryLock();
-            if (!locked) {
-              return false;
-            }
-          } else {
-            primaryMoveReadLock.lockInterruptibly();
-          }
+          primaryMoveReadLock.lockInterruptibly();
         }
         break; // success
       } catch (InterruptedException e) {
@@ -886,10 +865,6 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   public void doUnlockForPrimary() {
     Lock primaryMoveReadLock = getBucketAdvisor().getPrimaryMoveReadLock();
     primaryMoveReadLock.unlock();
-    Lock parentLock = getBucketAdvisor().getParentPrimaryMoveReadLock();
-    if (parentLock != null) {
-      parentLock.unlock();
-    }
   }
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 817386cb7f..9e7bb65ea2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -19,11 +19,11 @@ import static org.apache.geode.cache.Region.SEPARATOR;
 import static org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -33,8 +33,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.server.CacheServer;
@@ -43,10 +48,18 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 
-public class BucketAdvisorTest {
+class BucketAdvisorTest {
+  @Mock
+  private PartitionedRegion partitionedRegion;
+  @Mock
+  private Bucket bucket;
+  @Mock
+  private RegionAdvisor regionAdvisor;
+
+  private AutoCloseable closeable;
 
   @Test
-  public void shouldBeMockable() throws Exception {
+  void shouldBeMockable() throws Exception {
     BucketAdvisor mockBucketAdvisor = mock(BucketAdvisor.class);
     InternalDistributedMember mockInternalDistributedMember = mock(InternalDistributedMember.class);
 
@@ -58,7 +71,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void whenServerStopsAfterTheFirstIsRunningCheckThenItShouldNotBeAddedToLocations() {
+  void whenServerStopsAfterTheFirstIsRunningCheckThenItShouldNotBeAddedToLocations() {
     InternalCache mockCache = mock(InternalCache.class);
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
@@ -86,7 +99,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown() {
+  void whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown() {
     InternalCache mockCache = mock(InternalCache.class);
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
@@ -114,7 +127,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void volunteerForPrimaryIgnoresMissingPrimaryElector() {
+  void volunteerForPrimaryIgnoresMissingPrimaryElector() {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new InternalDistributedMember("localhost", 321));
 
@@ -153,12 +166,13 @@ public class BucketAdvisorTest {
         mock(BucketAdvisor.VolunteeringDelegate.class);
     advisorSpy.setVolunteeringDelegate(volunteeringDelegate);
     advisorSpy.initializePrimaryElector(missingElectorId);
-    assertEquals(missingElectorId, advisorSpy.getPrimaryElector());
+    assertThat(missingElectorId).isEqualTo(advisorSpy.getPrimaryElector());
     advisorSpy.volunteerForPrimary();
     verify(volunteeringDelegate).volunteerForPrimary();
   }
 
-  BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(Map<String, Boolean> shadowBuckets) {
+  BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(
+      Map<String, Boolean> shadowBuckets) {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new InternalDistributedMember("localhost", 321));
 
@@ -180,7 +194,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
+  void markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", false, SEPARATOR + "b2", true);
     BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -190,7 +204,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket() {
+  void markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket() {
     Map<String, Boolean> buckets =
         of(SEPARATOR + "b1", false, SEPARATOR + "b2", false, SEPARATOR + "b3", false);
     BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
@@ -201,7 +215,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
+  void markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", false);
     BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -217,7 +231,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void isShadowBucketDestroyedShouldReturnCorrectly() {
+  void isShadowBucketDestroyedShouldReturnCorrectly() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", true, SEPARATOR + "b2", false);
     BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -230,7 +244,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
+  void testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new InternalDistributedMember("localhost", 321));
 
@@ -252,7 +266,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket() {
+  void testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket() {
     DistributionManager distributionManager = mock(DistributionManager.class);
     InternalDistributedMember memberId = new InternalDistributedMember("localhost", 321);
 
@@ -282,7 +296,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket() {
+  void testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket() {
     DistributionManager distributionManager = mock(DistributionManager.class);
     InternalDistributedMember memberId = new InternalDistributedMember("localhost", 321);
     InternalDistributedMember memberId2 = new InternalDistributedMember("localhost", 323);
@@ -317,4 +331,38 @@ public class BucketAdvisorTest {
 
     assertThat(bucketAdvisor.adviseInitialized().size()).isEqualTo(1);
   }
+
+  @Test
+  void removePrimaryDeposePrimaryForColocatedChildrenBeforeReleasePrimaryLock() {
+    when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+    when(regionAdvisor.getBucket(any(Integer.class))).thenReturn(bucket);
+    when(partitionedRegion.getPartitionAttributes()).thenReturn(new PartitionAttributesImpl());
+    when(bucket.getDistributionManager()).thenReturn(mock(DistributionManager.class));
+    BucketAdvisor bucketAdvisor = spy(BucketAdvisor.createBucketAdvisor(bucket, regionAdvisor));
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    DistributionManager manager = mock(DistributionManager.class);
+    doReturn(true).when(bucketAdvisor).isPrimary();
+    doReturn(manager).when(bucketAdvisor).getDistributionManager();
+    when(manager.getId()).thenReturn(member);
+    bucketAdvisor.setPrimaryMemberForTest(member);
+    bucketAdvisor.setInitialized();
+    doNothing().when(bucketAdvisor).deposePrimaryForColocatedChildren();
+
+    InOrder order = inOrder(bucketAdvisor);
+    bucketAdvisor.removePrimary(member);
+
+    order.verify(bucketAdvisor).deposePrimaryForColocatedChildren();
+    order.verify(bucketAdvisor).releasePrimaryLock();
+    assertThat(bucketAdvisor.basicGetPrimaryMember()).isNull();
+  }
+
+  @BeforeEach
+  void init() {
+    closeable = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterEach
+  void close() throws Exception {
+    closeable.close();
+  }
 }