You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:53 UTC

[geode] 15/17: GEODE-9132: Always acquire write lock for PR clear

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 30f5e90c8e116e90adeec6e2ad847c5427b50942
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Wed Apr 21 15:24:06 2021 -0700

    GEODE-9132: Always acquire write lock for PR clear
---
 ...gionClearWithConcurrentOperationsDUnitTest.java | 110 ++++++++---------
 .../internal/cache/PartitionedRegionClear.java     | 115 ++++++++++++------
 .../internal/cache/PartitionedRegionClearTest.java | 131 +++++++++++++++++----
 3 files changed, 246 insertions(+), 110 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index 7ef187f..710ae6f 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -14,6 +14,9 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.time.Duration.ofMillis;
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
 import static org.apache.geode.internal.util.ArrayUtils.asList;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.VM.getVM;
@@ -21,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -73,11 +77,16 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
 
   private static final int BUCKETS = 13;
   private static final String REGION_NAME = "PartitionedRegion";
+  private static final Duration WORK_DURATION = Duration.ofSeconds(15);
 
   @Rule
   public DistributedRule distributedRule = new DistributedRule(3);
   @Rule
-  public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
+  public CacheRule cacheRule = CacheRule.builder()
+      .addSystemProperty(MAX_WAIT_TIME_RECONNECT, "1000")
+      .addSystemProperty(MEMBER_TIMEOUT, "2000")
+      .createCacheInAll()
+      .build();
 
   private VM server1;
   private VM server2;
@@ -106,15 +115,15 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     createRegions(regionShortcut);
 
     // Let all VMs continuously execute puts and gets for 60 seconds.
-    final int workMillis = 60000;
+    // final int workMillis = 60000;
     final int entries = 15000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executePuts(entries, workMillis)),
-        server2.invokeAsync(() -> executeGets(entries, workMillis)),
-        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+        server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)),
+        server2.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+        accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION)));
 
     // Clear the region every second for 60 seconds.
-    getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 1000));
+    getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(WORK_DURATION, ofMillis(1000)));
 
     // Let asyncInvocations finish.
     for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
@@ -142,17 +151,17 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     createRegions(regionShortcut);
 
     // Let all VMs continuously execute putAll and removeAll for 15 seconds.
-    final int workMillis = 15000;
+    // final int workMillis = 15000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)),
-        server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)),
-        server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)),
-        server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)),
-        accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)),
-        accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis)));
+        server1.invokeAsync(() -> executePutAlls(0, 2000, WORK_DURATION)),
+        server1.invokeAsync(() -> executeRemoveAlls(0, 2000, WORK_DURATION)),
+        server2.invokeAsync(() -> executePutAlls(2000, 4000, WORK_DURATION)),
+        server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION)),
+        accessor.invokeAsync(() -> executePutAlls(4000, 6000, WORK_DURATION)),
+        accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, WORK_DURATION)));
 
     // Clear the region every half second for 15 seconds.
-    getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 500));
+    getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(WORK_DURATION, ofMillis(500)));
 
     // Let asyncInvocations finish.
     for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
@@ -226,12 +235,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
     // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
-    final int workMillis = 30000;
+    // final int workMillis = 30000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executeGets(entries, workMillis)),
-        server1.invokeAsync(() -> executePuts(entries, workMillis)),
-        accessor.invokeAsync(() -> executeGets(entries, workMillis)),
-        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+        server1.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+        server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)),
+        accessor.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+        accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION)));
 
     // Retry the clear operation on the region until success (server2 will go down, but other
     // members will eventually become primary for those buckets previously hosted by server2).
@@ -278,12 +287,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
     // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30"
-    final int workMillis = 30000;
+    // final int workMillis = 30000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executeGets(entries, workMillis)),
-        server1.invokeAsync(() -> executePuts(entries, workMillis)),
-        accessor.invokeAsync(() -> executeGets(entries, workMillis)),
-        accessor.invokeAsync(() -> executeRemoves(entries, workMillis)));
+        server1.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+        server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)),
+        accessor.invokeAsync(() -> executeGets(entries, WORK_DURATION)),
+        accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION)));
 
     // Clear the region.
     getVM(coordinatorVM.getVmId()).invoke(() -> {
@@ -315,10 +324,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
     // Let all VMs continuously execute putAll/removeAll for 30 seconds.
-    final int workMillis = 30000;
+    // final int workMillis = 30000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
-        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+        server1.invokeAsync(() -> executePutAlls(0, 6000, WORK_DURATION)),
+        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION)));
 
     // Retry the clear operation on the region until success (server2 will go down, but other
     // members will eventually become primary for those buckets previously hosted by server2).
@@ -360,10 +369,9 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     createRegions(RegionShortcut.PARTITION);
     server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false)));
 
-    final int workMillis = 30000;
     List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
-        server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)),
-        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)));
+        server1.invokeAsync(() -> executePutAlls(0, 6000, WORK_DURATION)),
+        accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION)));
 
     // Clear the region.
     getVM(coordinatorVM.getVmId()).invoke(() -> {
@@ -520,12 +528,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   }
 
   /**
-   * Continuously execute get operations on the PartitionedRegion for the given durationInMillis.
+   * Continuously execute get operations on the PartitionedRegion for the given duration.
    */
-  private void executeGets(final int numEntries, final long durationInMillis) {
+  private void executeGets(final int numEntries, final Duration duration) {
     Cache cache = cacheRule.getCache();
     Region<String, String> region = cache.getRegion(REGION_NAME);
-    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+    Instant finishTime = Instant.now().plusMillis(duration.toMillis());
 
     while (Instant.now().isBefore(finishTime)) {
       // Region might have been cleared in between, that's why we check for null.
@@ -537,12 +545,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   }
 
   /**
-   * Continuously execute put operations on the PartitionedRegion for the given durationInMillis.
+   * Continuously execute put operations on the PartitionedRegion for the given duration.
    */
-  private void executePuts(final int numEntries, final long durationInMillis) {
+  private void executePuts(final int numEntries, final Duration duration) {
     Cache cache = cacheRule.getCache();
     Region<String, String> region = cache.getRegion(REGION_NAME);
-    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+    Instant finishTime = Instant.now().plusMillis(duration.toMillis());
 
     while (Instant.now().isBefore(finishTime)) {
       IntStream.range(0, numEntries).forEach(i -> region.put(String.valueOf(i), "Value_" + i));
@@ -550,16 +558,15 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   }
 
   /**
-   * Continuously execute putAll operations on the PartitionedRegion for the given
-   * durationInMillis.
+   * Continuously execute putAll operations on the PartitionedRegion for the given duration.
    */
-  private void executePutAlls(final int startKey, final int finalKey, final long durationInMillis) {
+  private void executePutAlls(final int startKey, final int finalKey, final Duration duration) {
     Cache cache = cacheRule.getCache();
     Map<String, String> valuesToInsert = new HashMap<>();
     Region<String, String> region = cache.getRegion(REGION_NAME);
     IntStream.range(startKey, finalKey)
         .forEach(i -> valuesToInsert.put(String.valueOf(i), "Value_" + i));
-    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+    Instant finishTime = Instant.now().plusMillis(duration.toMillis());
 
     while (Instant.now().isBefore(finishTime)) {
       region.putAll(valuesToInsert);
@@ -567,13 +574,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   }
 
   /**
-   * Continuously execute remove operations on the PartitionedRegion for the given
-   * durationInMillis.
+   * Continuously execute remove operations on the PartitionedRegion for the given duration.
    */
-  private void executeRemoves(final int numEntries, final long durationInMillis) {
+  private void executeRemoves(final int numEntries, final Duration duration) {
     Cache cache = cacheRule.getCache();
     Region<String, String> region = cache.getRegion(REGION_NAME);
-    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+    Instant finishTime = Instant.now().plusMillis(duration.toMillis());
 
     while (Instant.now().isBefore(finishTime)) {
       // Region might have been cleared in between, that's why we check for null.
@@ -585,16 +591,14 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   }
 
   /**
-   * Continuously execute removeAll operations on the PartitionedRegion for the given
-   * durationInMillis.
+   * Continuously execute removeAll operations on the PartitionedRegion for the given duration.
    */
-  private void executeRemoveAlls(final int startKey, final int finalKey,
-      final long durationInMillis) {
+  private void executeRemoveAlls(final int startKey, final int finalKey, final Duration duration) {
     Cache cache = cacheRule.getCache();
     List<String> keysToRemove = new ArrayList<>();
     Region<String, String> region = cache.getRegion(REGION_NAME);
     IntStream.range(startKey, finalKey).forEach(i -> keysToRemove.add(String.valueOf(i)));
-    Instant finishTime = Instant.now().plusMillis(durationInMillis);
+    Instant finishTime = Instant.now().plusMillis(duration.toMillis());
 
     while (Instant.now().isBefore(finishTime)) {
       region.removeAll(keysToRemove);
@@ -622,13 +626,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
   }
 
   /**
-   * Continuously execute clear operations on the PartitionedRegion every periodInMillis for the
-   * given durationInMillis.
+   * Continuously execute clear operations on the PartitionedRegion every period for the
+   * given duration.
    */
-  private void executeClears(final long durationInMillis, final long periodInMillis) {
+  private void executeClears(final Duration duration, final Duration period) {
     Cache cache = cacheRule.getCache();
     Region<String, String> region = cache.getRegion(REGION_NAME);
-    long minimumInvocationCount = durationInMillis / periodInMillis;
+    long minimumInvocationCount = duration.toMillis() / period.toMillis();
 
     for (int invocationCount = 0; invocationCount < minimumInvocationCount; invocationCount++) {
       region.clear();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 569f78c..b8597c1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.OperationAbortedException;
@@ -30,10 +31,12 @@ import org.apache.geode.cache.PartitionedRegionPartialClearException;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.MembershipListener;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage.PartitionedRegionClearResponse;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
@@ -47,6 +50,8 @@ public class PartitionedRegionClear {
 
   private final PartitionedRegion partitionedRegion;
 
+  private final DistributedLockService distributedLockService;
+
   protected final LockForListenerAndClientNotification lockForListenerAndClientNotification =
       new LockForListenerAndClientNotification();
 
@@ -55,8 +60,35 @@ public class PartitionedRegionClear {
   protected final PartitionedRegionClearListener partitionedRegionClearListener =
       new PartitionedRegionClearListener();
 
+  private final ColocationLeaderRegionProvider colocationLeaderRegionProvider;
+  private final AssignBucketsToPartitions assignBucketsToPartitions;
+  private final UpdateAttributesProcessorFactory updateAttributesProcessorFactory;
+
   public PartitionedRegionClear(PartitionedRegion partitionedRegion) {
+    this(partitionedRegion,
+        partitionedRegion.getPartitionedRegionLockService(),
+        ColocationHelper::getLeaderRegion,
+        PartitionRegionHelper::assignBucketsToPartitions,
+        pr -> new UpdateAttributesProcessor(pr, true));
+  }
+
+  @VisibleForTesting
+  PartitionedRegionClear(PartitionedRegion partitionedRegion,
+      DistributedLockService distributedLockService,
+      ColocationLeaderRegionProvider colocationLeaderRegionProvider,
+      AssignBucketsToPartitions assignBucketsToPartitions,
+      UpdateAttributesProcessorFactory updateAttributesProcessorFactory) {
     this.partitionedRegion = partitionedRegion;
+    this.distributedLockService = distributedLockService;
+    this.colocationLeaderRegionProvider = colocationLeaderRegionProvider;
+    this.assignBucketsToPartitions = assignBucketsToPartitions;
+    this.updateAttributesProcessorFactory = updateAttributesProcessorFactory;
+
+    // TODO: initialize needs to move out of constructor to prevent escape of reference to 'this'
+    initialize();
+  }
+
+  private void initialize() {
     partitionedRegion.getDistributionManager()
         .addMembershipListener(partitionedRegionClearListener);
   }
@@ -67,7 +99,7 @@ public class PartitionedRegionClear {
 
   void acquireDistributedClearLock(String clearLock) {
     try {
-      partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1);
+      distributedLockService.lock(clearLock, -1, -1);
     } catch (IllegalStateException e) {
       partitionedRegion.lockCheckReadiness();
       throw e;
@@ -76,7 +108,7 @@ public class PartitionedRegionClear {
 
   void releaseDistributedClearLock(String clearLock) {
     try {
-      partitionedRegion.getPartitionedRegionLockService().unlock(clearLock);
+      distributedLockService.unlock(clearLock);
     } catch (IllegalStateException e) {
       partitionedRegion.lockCheckReadiness();
     } catch (Exception ex) {
@@ -303,7 +335,7 @@ public class PartitionedRegionClear {
   protected Set<Integer> attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
       PartitionedRegionClearMessage.OperationType op)
       throws ForceReattemptException {
-    Set<Integer> bucketsOperated = new HashSet<>();
+    Set<Integer> clearedBuckets = new HashSet<>();
 
     if (partitionedRegion.getPRRoot() == null) {
       if (logger.isDebugEnabled()) {
@@ -311,8 +343,10 @@ public class PartitionedRegionClear {
             "Partition region {} failed to initialize. Remove its profile from remote members.",
             this.partitionedRegion);
       }
-      new UpdateAttributesProcessor(partitionedRegion, true).distribute(false);
-      return bucketsOperated;
+      updateAttributesProcessorFactory
+          .create(partitionedRegion)
+          .distribute(false);
+      return clearedBuckets;
     }
 
     final Set<InternalDistributedMember> configRecipients =
@@ -325,9 +359,9 @@ public class PartitionedRegionClear {
 
       if (prConfig != null) {
         for (Node node : prConfig.getNodes()) {
-          InternalDistributedMember idm = node.getMemberId();
-          if (!idm.equals(partitionedRegion.getMyId())) {
-            configRecipients.add(idm);
+          InternalDistributedMember memberId = node.getMemberId();
+          if (!memberId.equals(partitionedRegion.getMyId())) {
+            configRecipients.add(memberId);
           }
         }
       }
@@ -336,29 +370,29 @@ public class PartitionedRegionClear {
     }
 
     try {
-      PartitionedRegionClearMessage.PartitionedRegionClearResponse resp =
-          new PartitionedRegionClearMessage.PartitionedRegionClearResponse(
-              partitionedRegion.getSystem(), configRecipients);
-      PartitionedRegionClearMessage partitionedRegionClearMessage =
-          new PartitionedRegionClearMessage(configRecipients, partitionedRegion, resp, op, event);
-      partitionedRegionClearMessage.send();
+      PartitionedRegionClearResponse clearResponse =
+          new PartitionedRegionClearResponse(partitionedRegion.getSystem(), configRecipients);
+      PartitionedRegionClearMessage clearMessage =
+          new PartitionedRegionClearMessage(configRecipients, partitionedRegion, clearResponse, op,
+              event);
+      clearMessage.send();
 
-      resp.waitForRepliesUninterruptibly();
-      bucketsOperated = resp.bucketsCleared;
+      clearResponse.waitForRepliesUninterruptibly();
+      clearedBuckets = clearResponse.bucketsCleared;
 
     } catch (ReplyException e) {
-      Throwable t = e.getCause();
-      if (t instanceof ForceReattemptException) {
-        throw (ForceReattemptException) t;
+      Throwable cause = e.getCause();
+      if (cause instanceof ForceReattemptException) {
+        throw (ForceReattemptException) cause;
       }
-      if (t instanceof PartitionedRegionPartialClearException) {
-        throw new PartitionedRegionPartialClearException(t.getMessage(), t);
+      if (cause instanceof PartitionedRegionPartialClearException) {
+        throw (PartitionedRegionPartialClearException) cause;
       }
       logger.warn(
           "PartitionedRegionClear#sendPartitionedRegionClearMessage: Caught exception during ClearRegionMessage send and waiting for response",
           e);
     }
-    return bucketsOperated;
+    return clearedBuckets;
   }
 
   /**
@@ -412,14 +446,9 @@ public class PartitionedRegionClear {
         invokeCacheWriter(regionEvent);
       }
 
-      // Check if there are any listeners or clients interested. If so, then clear write
-      // locks needs to be taken on all local and remote primary buckets in order to
-      // preserve the ordering of client events (for concurrent operations on the region).
-      boolean acquireClearLockForNotification =
-          (partitionedRegion.hasAnyClientsInterested() || partitionedRegion.hasListener());
-      if (acquireClearLockForNotification) {
-        obtainLockForClear(regionEvent);
-      }
+      // clear write locks need to be taken on all local and remote primary buckets
+      // whether or not the partitioned region has any listeners clients interested
+      obtainLockForClear(regionEvent);
       try {
         Set<Integer> bucketsCleared = clearRegion(regionEvent);
 
@@ -435,9 +464,7 @@ public class PartitionedRegionClear {
           throw new PartitionedRegionPartialClearException(message);
         }
       } finally {
-        if (acquireClearLockForNotification) {
-          releaseLockForClear(regionEvent);
-        }
+        releaseLockForClear(regionEvent);
       }
     } finally {
       releaseDistributedClearLock(lockName);
@@ -458,8 +485,8 @@ public class PartitionedRegionClear {
   }
 
   protected void assignAllPrimaryBuckets() {
-    PartitionedRegion leader = ColocationHelper.getLeaderRegion(partitionedRegion);
-    PartitionRegionHelper.assignBucketsToPartitions(leader);
+    PartitionedRegion leader = colocationLeaderRegionProvider.getLeaderRegion(partitionedRegion);
+    assignBucketsToPartitions.assignBucketsToPartitions(leader);
   }
 
   protected void handleClearFromDepartedMember(InternalDistributedMember departedMember) {
@@ -505,6 +532,24 @@ public class PartitionedRegionClear {
     return membershipChange;
   }
 
+  @FunctionalInterface
+  @VisibleForTesting
+  interface ColocationLeaderRegionProvider {
+    PartitionedRegion getLeaderRegion(PartitionedRegion partitionedRegion);
+  }
+
+  @FunctionalInterface
+  @VisibleForTesting
+  interface AssignBucketsToPartitions {
+    void assignBucketsToPartitions(PartitionedRegion partitionedRegion);
+  }
+
+  @FunctionalInterface
+  @VisibleForTesting
+  interface UpdateAttributesProcessorFactory {
+    UpdateAttributesProcessor create(PartitionedRegion partitionedRegion);
+  }
+
   protected class PartitionedRegionClearListener implements MembershipListener {
 
     @Override
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
index 721d236..376fc8e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentCaptor.forClass;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
@@ -42,7 +43,6 @@ import org.mockito.ArgumentCaptor;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.PartitionedRegionPartialClearException;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -50,41 +50,74 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.MembershipListener;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.PartitionedRegion.RetryTimeKeeper;
+import org.apache.geode.internal.cache.PartitionedRegionClear.AssignBucketsToPartitions;
+import org.apache.geode.internal.cache.PartitionedRegionClear.ColocationLeaderRegionProvider;
 import org.apache.geode.internal.cache.PartitionedRegionClear.PartitionedRegionClearListener;
+import org.apache.geode.internal.cache.PartitionedRegionClear.UpdateAttributesProcessorFactory;
 import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.serialization.KnownVersion;
 
 public class PartitionedRegionClearTest {
 
-  private GemFireCacheImpl cache;
-  private HashSet<AsyncEventQueue> allAEQs = new HashSet<>();
   private PartitionedRegionClear partitionedRegionClear;
   private DistributionManager distributionManager;
   private PartitionedRegion partitionedRegion;
   private RegionAdvisor regionAdvisor;
   private InternalDistributedMember internalDistributedMember;
+  private DistributedLockService distributedLockService;
 
   @Before
   public void setUp() {
-
-    cache = mock(GemFireCacheImpl.class);
+    AssignBucketsToPartitions assignBucketsToPartitions = mock(AssignBucketsToPartitions.class);
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    ColocationLeaderRegionProvider colocationLeaderRegionProvider =
+        mock(ColocationLeaderRegionProvider.class);
+    distributedLockService = mock(DistributedLockService.class);
     distributionManager = mock(DistributionManager.class);
+    FilterProfile filterProfile = mock(FilterProfile.class);
     internalDistributedMember = mock(InternalDistributedMember.class);
     partitionedRegion = mock(PartitionedRegion.class);
     regionAdvisor = mock(RegionAdvisor.class);
+    UpdateAttributesProcessorFactory updateAttributesProcessorFactory =
+        mock(UpdateAttributesProcessorFactory.class);
+
+    when(cache.getAsyncEventQueues(false))
+        .thenReturn(emptySet());
+    when(colocationLeaderRegionProvider.getLeaderRegion(any()))
+        .thenReturn(partitionedRegion);
+    when(distributedLockService.lock(anyString(), anyInt(), anyInt()))
+        .thenReturn(true);
+    when(distributionManager.getDistributionManagerId())
+        .thenReturn(internalDistributedMember);
+    when(distributionManager.getId())
+        .thenReturn(internalDistributedMember);
+    when(internalDistributedMember.getVersion())
+        .thenReturn(KnownVersion.CURRENT);
+    when(partitionedRegion.getCache())
+        .thenReturn(cache);
+    when(partitionedRegion.getDistributionManager())
+        .thenReturn(distributionManager);
+    when(partitionedRegion.getName())
+        .thenReturn("prRegion");
+    when(partitionedRegion.getRegionAdvisor())
+        .thenReturn(regionAdvisor);
+    when(partitionedRegion.getFilterProfile())
+        .thenReturn(filterProfile);
+    when(filterProfile.getFilterRoutingInfoPart1(any(), any(), any()))
+        .thenReturn(mock(FilterRoutingInfo.class));
+    when(filterProfile.getFilterRoutingInfoPart2(any(), any()))
+        .thenReturn(mock(FilterRoutingInfo.class));
+    when(regionAdvisor.getDistributionManager())
+        .thenReturn(distributionManager);
+    when(updateAttributesProcessorFactory.create(any()))
+        .thenReturn(mock(UpdateAttributesProcessor.class));
 
-    when(distributionManager.getDistributionManagerId()).thenReturn(internalDistributedMember);
-    when(distributionManager.getId()).thenReturn(internalDistributedMember);
-    when(internalDistributedMember.getVersion()).thenReturn(KnownVersion.CURRENT);
-    when(partitionedRegion.getCache()).thenReturn(cache);
-    when(cache.getAsyncEventQueues(false)).thenReturn(allAEQs);
-    when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager);
-    when(partitionedRegion.getName()).thenReturn("prRegion");
-    when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
-    when(regionAdvisor.getDistributionManager()).thenReturn(distributionManager);
+    doNothing().when(distributedLockService).unlock(anyString());
 
-    partitionedRegionClear = new PartitionedRegionClear(partitionedRegion);
+    partitionedRegionClear = new PartitionedRegionClear(partitionedRegion, distributedLockService,
+        colocationLeaderRegionProvider, assignBucketsToPartitions,
+        updateAttributesProcessorFactory);
   }
 
   @Test
@@ -115,9 +148,7 @@ public class PartitionedRegionClearTest {
   @Test
   public void acquireDistributedClearLockGetsDistributedLock() {
     // arrange
-    DistributedLockService distributedLockService = mock(DistributedLockService.class);
     String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName();
-    when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService);
 
     // act
     partitionedRegionClear.acquireDistributedClearLock(lockName);
@@ -129,9 +160,7 @@ public class PartitionedRegionClearTest {
   @Test
   public void releaseDistributedClearLockReleasesDistributedLock() {
     // arrange
-    DistributedLockService distributedLockService = mock(DistributedLockService.class);
     String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName();
-    when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService);
 
     // act
     partitionedRegionClear.releaseDistributedClearLock(lockName);
@@ -567,6 +596,7 @@ public class PartitionedRegionClearTest {
   public void doClearAcquiresAndReleasesDistributedClearLockAndCreatesAllPrimaryBuckets() {
     // arrange
     RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(regionEvent.clone()).thenReturn(regionEvent);
 
     // partial mocking to stub some methods and verify
     PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
@@ -587,6 +617,7 @@ public class PartitionedRegionClearTest {
   public void doClearInvokesCacheWriterWhenCacheWriteIsSet() {
     // arrange
     RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(regionEvent.clone()).thenReturn(regionEvent);
 
     // partial mocking to stub some methods and verify
     PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
@@ -605,6 +636,7 @@ public class PartitionedRegionClearTest {
   public void doClearDoesNotInvokesCacheWriterWhenCacheWriteIsNotSet() {
     // arrange
     RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(regionEvent.clone()).thenReturn(regionEvent);
 
     // partial mocking to stub some methods and verify
     PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
@@ -669,12 +701,13 @@ public class PartitionedRegionClearTest {
   }
 
   @Test
-  public void doClearDoesNotObtainLockForClearWhenRegionHasNoListenerAndNoClientInterest() {
+  public void doClearObtainsLockForClearWhenRegionHasNoListenerAndNoClientInterest() {
     // arrange
     RegionEventImpl regionEvent = mock(RegionEventImpl.class);
 
     when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
     when(partitionedRegion.hasListener()).thenReturn(false);
+    when(regionEvent.clone()).thenReturn(regionEvent);
 
     // partial mocking to stub some methods and verify
     PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
@@ -688,8 +721,8 @@ public class PartitionedRegionClearTest {
     spyPartitionedRegionClear.doClear(regionEvent, false);
 
     // assert
-    verify(spyPartitionedRegionClear, never()).obtainLockForClear(regionEvent);
-    verify(spyPartitionedRegionClear, never()).releaseLockForClear(regionEvent);
+    verify(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+    verify(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
   }
 
   @Test
@@ -867,6 +900,60 @@ public class PartitionedRegionClearTest {
         .isNotNull();
   }
 
+  @Test
+  public void doClearAcquiresLockForClearWhenHasAnyClientsInterestedIsTrue() {
+    // arrange
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true);
+    when(partitionedRegion.hasListener()).thenReturn(false);
+    when(regionEvent.clone()).thenReturn(regionEvent);
+
+    partitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent);
+
+    // act
+    partitionedRegionClear.doClear(regionEvent, false);
+
+    // assert
+    verify(partitionedRegionClear).obtainLockForClear(regionEvent);
+  }
+
+  @Test
+  public void doClearAcquiresLockForClearWhenHasListenerIsTrue() {
+    // arrange
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+    when(partitionedRegion.hasListener()).thenReturn(true);
+    when(regionEvent.clone()).thenReturn(regionEvent);
+
+    partitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent);
+
+    // act
+    partitionedRegionClear.doClear(regionEvent, false);
+
+    // assert
+    verify(partitionedRegionClear).obtainLockForClear(regionEvent);
+  }
+
+  @Test
+  public void doClearAcquiresLockForClearWhenHasAnyClientsInterestedAndHasListenerAreFalse() {
+    // arrange
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+    when(partitionedRegion.hasListener()).thenReturn(false);
+    when(regionEvent.clone()).thenReturn(regionEvent);
+
+    partitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent);
+
+    // act
+    partitionedRegionClear.doClear(regionEvent, false);
+
+    // assert
+    verify(partitionedRegionClear).obtainLockForClear(regionEvent);
+  }
+
   private Set<BucketRegion> setupBucketRegions(
       PartitionedRegionDataStore dataStore,
       BucketAdvisor bucketAdvisor) {