You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2021/04/30 20:53:01 UTC

[geode] branch feature/GEODE-7665 updated: GEODE-9195: Remove PR clear local locking (#6410)

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push:
     new e4a7b61  GEODE-9195: Remove PR clear local locking (#6410)
e4a7b61 is described below

commit e4a7b618ccccbf56d2582cfd11b6a86ea91b6e44
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 30 13:51:33 2021 -0700

    GEODE-9195: Remove PR clear local locking (#6410)
    
    Unit test changes in BucketRegion and DistributedRegion.
    
    Unit test most of PartitionedRegionClearMessage.
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |   2 +-
 .../apache/geode/internal/cache/BucketRegion.java  |  25 +-
 .../geode/internal/cache/DistributedRegion.java    |  29 ++-
 .../internal/cache/PartitionedRegionClear.java     |  15 +-
 .../cache/PartitionedRegionClearMessage.java       | 109 ++++++--
 .../geode/internal/cache/RegionEventFactory.java   |  30 +++
 .../internal/cache/BucketRegionJUnitTest.java      |  59 ++++-
 .../internal/cache/DistributedRegionTest.java      |  93 +++++--
 .../cache/PartitionedRegionClearMessageTest.java   | 285 +++++++++++++++++++++
 9 files changed, 561 insertions(+), 86 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index d1a8742..35d7a2b 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1076,7 +1076,7 @@ fromData,207
 toData,178
 
 org/apache/geode/internal/cache/PartitionedRegionClearMessage,2
-fromData,40
+fromData,49
 toData,36
 
 org/apache/geode/internal/cache/PartitionedRegionClearMessage$PartitionedRegionClearReplyMessage,2
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 49f6aad..18f2ef9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -37,6 +37,7 @@ import org.apache.geode.InternalGemFireError;
 import org.apache.geode.InvalidDeltaException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.CacheWriterException;
@@ -577,11 +578,9 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     // get rvvLock
     Set<InternalDistributedMember> participants =
         getCacheDistributionAdvisor().adviseInvalidateRegion();
-    boolean isLockedAlready = this.partitionedRegion.getPartitionedRegionClear()
-        .isLockedForListenerAndClientNotification();
 
     try {
-      obtainWriteLocksForClear(regionEvent, participants, isLockedAlready);
+      obtainWriteLocksForClear(regionEvent, participants);
       // no need to dominate my own rvv.
       // Clear is on going here, there won't be GII for this member
       clearRegionLocally(regionEvent, cacheWrite, null);
@@ -589,10 +588,28 @@ public class BucketRegion extends DistributedRegion implements Bucket {
 
       // TODO: call reindexUserDataRegion if there're lucene indexes
     } finally {
-      releaseWriteLocksForClear(regionEvent, participants, isLockedAlready);
+      releaseWriteLocksForClear(regionEvent, participants);
     }
   }
 
+  @Override
+  protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
+      Set<InternalDistributedMember> participants) {
+    lockAndFlushClearToOthers(regionEvent, participants);
+  }
+
+  @Override
+  protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
+      Set<InternalDistributedMember> participants) {
+    distributedClearOperationReleaseLocks(regionEvent, participants);
+  }
+
+  @VisibleForTesting
+  void distributedClearOperationReleaseLocks(RegionEventImpl regionEvent,
+      Set<InternalDistributedMember> participants) {
+    DistributedClearOperation.releaseLocks(regionEvent, participants);
+  }
+
   long generateTailKey() {
     long key = eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets());
     if (key < 0 || key % getPartitionedRegion().getTotalNumberOfBuckets() != getId()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 3d6df11..0f419ad 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2027,13 +2027,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
               getCacheDistributionAdvisor().adviseInvalidateRegion();
           // pause all generation of versions and flush from the other members to this one
           try {
-            obtainWriteLocksForClear(regionEvent, participants, false);
+            obtainWriteLocksForClear(regionEvent, participants);
             clearRegionLocally(regionEvent, cacheWrite, null);
             if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
               distributeClearOperation(regionEvent, null, participants);
             }
           } finally {
-            releaseWriteLocksForClear(regionEvent, participants, false);
+            releaseWriteLocksForClear(regionEvent, participants);
           }
         } finally {
           distributedUnlockForClear();
@@ -2082,30 +2082,31 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     }
   }
 
-
   /**
    * obtain locks preventing generation of new versions in other members
    */
   protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
-      Set<InternalDistributedMember> participants, boolean localLockedAlready) {
-    if (!localLockedAlready) {
-      lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
-    }
-    lockAndFlushClearToOthers(regionEvent, participants);
+      Set<InternalDistributedMember> recipients) {
+    lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
+    lockAndFlushClearToOthers(regionEvent, recipients);
   }
 
   /**
    * releases the locks obtained in obtainWriteLocksForClear
    */
   protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
-      Set<InternalDistributedMember> participants,
-      boolean localLockedAlready) {
-    if (!localLockedAlready) {
-      releaseLockLocallyForClear(regionEvent);
-    }
-    DistributedClearOperation.releaseLocks(regionEvent, participants);
+      Set<InternalDistributedMember> recipients) {
+    releaseLockLocallyForClear(regionEvent);
+    distributedClearOperationReleaseLocks(regionEvent, recipients);
   }
 
+  @VisibleForTesting
+  void distributedClearOperationReleaseLocks(RegionEventImpl regionEvent,
+      Set<InternalDistributedMember> recipients) {
+    DistributedClearOperation.releaseLocks(regionEvent, recipients);
+  }
+
+  @VisibleForTesting
   void lockAndFlushClearToOthers(RegionEventImpl regionEvent,
       Set<InternalDistributedMember> participants) {
     DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 5f4e589..8403306 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -36,6 +36,7 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.MembershipListener;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType;
 import org.apache.geode.internal.cache.PartitionedRegionClearMessage.PartitionedRegionClearResponse;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -141,8 +142,7 @@ public class PartitionedRegionClear {
    */
   void obtainLockForClear(RegionEventImpl event) {
     obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
-    sendPartitionedRegionClearMessage(event,
-        PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+    sendPartitionedRegionClearMessage(event, OperationType.OP_LOCK_FOR_PR_CLEAR);
   }
 
   /**
@@ -150,8 +150,7 @@ public class PartitionedRegionClear {
    */
   void releaseLockForClear(RegionEventImpl event) {
     releaseClearLockLocal();
-    sendPartitionedRegionClearMessage(event,
-        PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+    sendPartitionedRegionClearMessage(event, OperationType.OP_UNLOCK_FOR_PR_CLEAR);
   }
 
   /**
@@ -162,7 +161,7 @@ public class PartitionedRegionClear {
     Set<Integer> localPrimaryBuckets = clearRegionLocal(regionEvent);
     // this includes all remote primary buckets and their secondaries
     Set<Integer> remotePrimaryBuckets = sendPartitionedRegionClearMessage(regionEvent,
-        PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+        OperationType.OP_PR_CLEAR);
 
     Set<Integer> allBucketsCleared = new HashSet<>();
     allBucketsCleared.addAll(localPrimaryBuckets);
@@ -332,7 +331,7 @@ public class PartitionedRegionClear {
   }
 
   protected Set<Integer> sendPartitionedRegionClearMessage(RegionEventImpl event,
-      PartitionedRegionClearMessage.OperationType op) {
+      OperationType op) {
     RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone();
     eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR);
 
@@ -349,7 +348,7 @@ public class PartitionedRegionClear {
    * @return buckets that are cleared. empty set if any exception happened
    */
   protected Set<Integer> attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
-      PartitionedRegionClearMessage.OperationType op)
+      OperationType op)
       throws ForceReattemptException {
     Set<Integer> clearedBuckets = new HashSet<>();
 
@@ -394,7 +393,7 @@ public class PartitionedRegionClear {
       clearMessage.send();
 
       clearResponse.waitForRepliesUninterruptibly();
-      clearedBuckets = clearResponse.bucketsCleared;
+      clearedBuckets = clearResponse.getBucketsCleared();
 
     } catch (ReplyException e) {
       Throwable cause = e.getCause();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
index cd33f78..c4c1ca5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
@@ -14,15 +14,19 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.util.Collections.unmodifiableSet;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Objects;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
@@ -52,22 +56,47 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
   private Object callbackArgument;
   private OperationType operationType;
   private EventID eventId;
-  private PartitionedRegion partitionedRegion;
   private Set<Integer> bucketsCleared;
+  private DistributionManager distributionManager;
+  private RegionEventFactory regionEventFactory;
 
   public PartitionedRegionClearMessage() {
     // nothing
   }
 
-  PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients,
-      PartitionedRegion partitionedRegion, ReplyProcessor21 replyProcessor21,
-      PartitionedRegionClearMessage.OperationType operationType,
+  PartitionedRegionClearMessage(Collection<InternalDistributedMember> recipients,
+      PartitionedRegion partitionedRegion,
+      ReplyProcessor21 replyProcessor21,
+      OperationType operationType,
       final RegionEventImpl regionEvent) {
-    super(recipients, partitionedRegion.getPRId(), replyProcessor21);
-    this.partitionedRegion = partitionedRegion;
+    this(recipients,
+        partitionedRegion.getDistributionManager(),
+        partitionedRegion.getPRId(),
+        replyProcessor21,
+        operationType,
+        regionEvent.getRawCallbackArgument(),
+        regionEvent.getEventId(),
+        partitionedRegion.getCache().getTxManager().isDistributed(),
+        RegionEventImpl::new);
+  }
+
+  @VisibleForTesting
+  PartitionedRegionClearMessage(Collection<InternalDistributedMember> recipients,
+      DistributionManager distributionManager,
+      int partitionedRegionId,
+      ReplyProcessor21 replyProcessor21,
+      OperationType operationType,
+      Object callbackArgument,
+      EventID eventId,
+      boolean isTransactionDistributed,
+      RegionEventFactory regionEventFactory) {
+    super(recipients, partitionedRegionId, replyProcessor21);
+    setTransactionDistributed(isTransactionDistributed);
+    this.distributionManager = distributionManager;
     this.operationType = operationType;
-    callbackArgument = regionEvent.getRawCallbackArgument();
-    eventId = regionEvent.getEventId();
+    this.callbackArgument = callbackArgument;
+    this.eventId = eventId;
+    this.regionEventFactory = regionEventFactory;
   }
 
   @Override
@@ -82,8 +111,7 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
   public void send() {
     Objects.requireNonNull(getRecipients(), "ClearMessage NULL recipients set");
 
-    setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed());
-    partitionedRegion.getDistributionManager().putOutgoing(this);
+    distributionManager.putOutgoing(this);
   }
 
   @Override
@@ -108,15 +136,17 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
       return true;
     }
 
+    PartitionedRegionClear partitionedRegionClear = partitionedRegion.getPartitionedRegionClear();
+
     if (operationType == OperationType.OP_LOCK_FOR_PR_CLEAR) {
-      partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender());
+      partitionedRegionClear.obtainClearLockLocal(getSender());
     } else if (operationType == OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
-      partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal();
+      partitionedRegionClear.releaseClearLockLocal();
     } else {
-      RegionEventImpl event =
-          new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true,
+      RegionEventImpl event = (RegionEventImpl) regionEventFactory
+          .create(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true,
               partitionedRegion.getMyId(), getEventID());
-      bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event);
+      bucketsCleared = partitionedRegionClear.clearRegionLocal(event);
     }
     return true;
   }
@@ -125,9 +155,9 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
   protected void appendFields(StringBuilder stringBuilder) {
     super.appendFields(stringBuilder);
     stringBuilder
-        .append(" cbArg=")
+        .append(" callbackArgument=")
         .append(callbackArgument)
-        .append(" op=")
+        .append(" operationType=")
         .append(operationType);
   }
 
@@ -141,8 +171,10 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
     callbackArgument = DataSerializer.readObject(in);
-    operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+    operationType = OperationType.values()[in.readByte()];
     eventId = DataSerializer.readObject(in);
+
+    regionEventFactory = RegionEventImpl::new;
   }
 
   @Override
@@ -160,21 +192,36 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
     if (partitionedRegion != null && startTime > 0) {
       partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
     }
-    PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage
+    PartitionedRegionClearReplyMessage
         .send(recipient, processorId, getReplySender(distributionManager), operationType,
             bucketsCleared, replyException);
   }
 
+  @VisibleForTesting
+  DistributionManager getDistributionManagerForTesting() {
+    return distributionManager;
+  }
+
+  @VisibleForTesting
+  Object getCallbackArgumentForTesting() {
+    return callbackArgument;
+  }
+
+  @VisibleForTesting
+  RegionEventFactory getRegionEventFactoryForTesting() {
+    return regionEventFactory;
+  }
+
   /**
    * The response on which to wait for all the replies. This response ignores any exceptions
    * received from the "far side"
    */
   public static class PartitionedRegionClearResponse extends ReplyProcessor21 {
 
-    CopyOnWriteHashSet<Integer> bucketsCleared = new CopyOnWriteHashSet<>();
+    private final Set<Integer> bucketsCleared = new CopyOnWriteHashSet<>();
 
     public PartitionedRegionClearResponse(InternalDistributedSystem system,
-        Set<InternalDistributedMember> recipients) {
+        Collection<InternalDistributedMember> recipients) {
       super(system, recipients);
     }
 
@@ -188,12 +235,15 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
       }
       process(message, true);
     }
+
+    Set<Integer> getBucketsCleared() {
+      return unmodifiableSet(bucketsCleared);
+    }
   }
 
   public static class PartitionedRegionClearReplyMessage extends ReplyMessage {
 
     private Set<Integer> bucketsCleared;
-
     private OperationType operationType;
 
     @Override
@@ -201,14 +251,17 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
       return true;
     }
 
-    public static void send(InternalDistributedMember recipient, int processorId,
-        ReplySender replySender, OperationType operationType, Set<Integer> bucketsCleared,
+    private static void send(InternalDistributedMember recipient,
+        int processorId,
+        ReplySender replySender,
+        OperationType operationType,
+        Set<Integer> bucketsCleared,
         ReplyException replyException) {
       Objects.requireNonNull(recipient, "partitionedRegionClearReplyMessage NULL reply message");
 
-      PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage replyMessage =
-          new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId,
-              operationType, bucketsCleared, replyException);
+      PartitionedRegionClearReplyMessage replyMessage =
+          new PartitionedRegionClearReplyMessage(processorId, operationType, bucketsCleared,
+              replyException);
 
       replyMessage.setRecipient(recipient);
       replySender.putOutgoing(replyMessage);
@@ -260,7 +313,7 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
     public void fromData(DataInput in, DeserializationContext context)
         throws IOException, ClassNotFoundException {
       super.fromData(in, context);
-      operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+      operationType = OperationType.values()[in.readByte()];
       bucketsCleared = DataSerializer.readObject(in);
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java
new file mode 100644
index 0000000..c759a44
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.distributed.DistributedMember;
+
+@FunctionalInterface
+public interface RegionEventFactory {
+
+  RegionEvent create(PartitionedRegion partitionedRegion,
+      Operation operation,
+      Object callbackArgument,
+      boolean originRemote,
+      DistributedMember distributedMember,
+      EventID eventId);
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index 0d1cc87..c0a635f 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -14,11 +14,13 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.util.Collections.emptySet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -29,6 +31,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -36,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.junit.Test;
 
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.statistics.StatisticsClock;
 
@@ -183,17 +187,6 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
   }
 
   @Test
-  public void obtainWriteLocksForClearInBRShouldDistribute() {
-    RegionEventImpl event = createClearRegionEvent();
-    BucketRegion region = (BucketRegion) event.getRegion();
-    doNothing().when(region).lockLocallyForClear(any(), any(), any());
-    doNothing().when(region).lockAndFlushClearToOthers(any(), any());
-    region.obtainWriteLocksForClear(event, null, false);
-    verify(region).lockLocallyForClear(any(), any(), eq(event));
-    verify(region).lockAndFlushClearToOthers(eq(event), eq(null));
-  }
-
-  @Test
   public void updateSizeToZeroOnClearBucketRegion() {
     RegionEventImpl event = createClearRegionEvent();
     BucketRegion region = (BucketRegion) event.getRegion();
@@ -211,4 +204,48 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
     long sizeAfterClear = region.getTotalBytes();
     assertEquals(0, sizeAfterClear);
   }
+
+  @Test
+  public void obtainWriteLocksForClearInBRShouldLockAndFlushToOthers() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    doNothing().when(region).lockAndFlushClearToOthers(any(), any());
+    region.obtainWriteLocksForClear(event, null);
+    verify(region).lockAndFlushClearToOthers(eq(event), eq(null));
+  }
+
+  @Test
+  public void obtainWriteLocksForClear_invokes_lockAndFlushClearToOthers() {
+    Set<InternalDistributedMember> recipients = emptySet();
+    BucketRegion bucketRegion = bucketRegionForClearLocking();
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+    bucketRegion.obtainWriteLocksForClear(regionEvent, recipients);
+
+    verify(bucketRegion).lockAndFlushClearToOthers(regionEvent, recipients);
+  }
+
+  @Test
+  public void releaseWriteLocksForClear_invokes_distributedClearOperationReleaseLocks() {
+    Set<InternalDistributedMember> recipients = emptySet();
+    BucketRegion bucketRegion = bucketRegionForClearLocking();
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+    bucketRegion.releaseWriteLocksForClear(regionEvent, recipients);
+
+    verify(bucketRegion).distributedClearOperationReleaseLocks(regionEvent, recipients);
+  }
+
+  private BucketRegion bucketRegionForClearLocking() {
+    // use partial-mock with null fields to verify method invocations
+    BucketRegion bucketRegion = mock(BucketRegion.class, CALLS_REAL_METHODS);
+
+    // doNothing when invoking locking methods for clear
+    doNothing().when(bucketRegion).lockAndFlushClearToOthers(any(), any());
+
+    // doNothing when invoking unlocking methods for clear
+    doNothing().when(bucketRegion).distributedClearOperationReleaseLocks(any(), any());
+
+    return bucketRegion;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
index 13a2685..185c67d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
@@ -14,14 +14,19 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.util.Collections.emptySet;
 import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -29,6 +34,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
+import java.util.Set;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -43,36 +49,22 @@ import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
 
-
 public class DistributedRegionTest {
+
   private RegionVersionVector<VersionSource<Object>> vector;
   private RegionVersionHolder<VersionSource<Object>> holder;
   private VersionSource<Object> lostMemberVersionID;
   private InternalDistributedMember member;
 
   @Before
-  @SuppressWarnings("unchecked")
   public void setup() {
-    vector = mock(RegionVersionVector.class);
-    holder = mock(RegionVersionHolder.class);
-    lostMemberVersionID = mock(VersionSource.class);
+    vector = uncheckedCast(mock(RegionVersionVector.class));
+    holder = uncheckedCast(mock(RegionVersionHolder.class));
+    lostMemberVersionID = uncheckedCast(mock(VersionSource.class));
     member = mock(InternalDistributedMember.class);
   }
 
   @Test
-  public void shouldBeMockable() throws Exception {
-    DistributedRegion mockDistributedRegion = mock(DistributedRegion.class);
-    EntryEventImpl mockEntryEventImpl = mock(EntryEventImpl.class);
-    Object returnValue = new Object();
-
-    when(mockDistributedRegion.validatedDestroy(any(), eq(mockEntryEventImpl)))
-        .thenReturn(returnValue);
-
-    assertThat(mockDistributedRegion.validatedDestroy(new Object(), mockEntryEventImpl))
-        .isSameAs(returnValue);
-  }
-
-  @Test
   public void cleanUpAfterFailedInitialImageHoldsLockForClear() {
     DistributedRegion distributedRegion = mock(DistributedRegion.class, RETURNS_DEEP_STUBS);
     RegionMap regionMap = mock(RegionMap.class);
@@ -99,7 +91,7 @@ public class DistributedRegionTest {
 
     distributedRegion.cleanUpAfterFailedGII(true);
 
-    verify(diskRegion).resetRecoveredEntries(eq(distributedRegion));
+    verify(diskRegion).resetRecoveredEntries(distributedRegion);
     verify(distributedRegion, never()).closeEntries();
   }
 
@@ -260,4 +252,65 @@ public class DistributedRegionTest {
         .hasMessage("Parallel Gateway Sender " + senderId
             + " can not be used with replicated region " + regionPath);
   }
+
+  @Test
+  public void obtainWriteLocksForClear_invokes_lockLocallyForClear() {
+    DistributedRegion distributedRegion = distributedRegionForClearLocking();
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+    distributedRegion.obtainWriteLocksForClear(regionEvent, emptySet());
+
+    verify(distributedRegion).lockLocallyForClear(any(), any(), eq(regionEvent));
+  }
+
+  @Test
+  public void obtainWriteLocksForClear_invokes_lockAndFlushClearToOthers() {
+    Set<InternalDistributedMember> recipients = emptySet();
+    DistributedRegion distributedRegion = distributedRegionForClearLocking();
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+    distributedRegion.obtainWriteLocksForClear(regionEvent, recipients);
+
+    verify(distributedRegion).lockAndFlushClearToOthers(regionEvent, recipients);
+  }
+
+  @Test
+  public void releaseWriteLocksForClear_invokes_releaseLockLocallyForClear() {
+    DistributedRegion distributedRegion = distributedRegionForClearLocking();
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+    distributedRegion.releaseWriteLocksForClear(regionEvent, emptySet());
+
+    verify(distributedRegion).releaseLockLocallyForClear(regionEvent);
+  }
+
+  @Test
+  public void releaseWriteLocksForClear_invokes_distributedClearOperationReleaseLocks() {
+    Set<InternalDistributedMember> recipients = emptySet();
+    DistributedRegion distributedRegion = distributedRegionForClearLocking();
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+    distributedRegion.releaseWriteLocksForClear(regionEvent, recipients);
+
+    verify(distributedRegion).distributedClearOperationReleaseLocks(regionEvent, recipients);
+  }
+
+  private DistributedRegion distributedRegionForClearLocking() {
+    // use partial-mock with null fields to verify method invocations
+    DistributedRegion distributedRegion = mock(DistributedRegion.class, CALLS_REAL_METHODS);
+
+    // stub out getDistributionManager and getMyId
+    doReturn(null).when(distributedRegion).getDistributionManager();
+    doReturn(null).when(distributedRegion).getMyId();
+
+    // doNothing when invoking locking methods for clear
+    doNothing().when(distributedRegion).lockAndFlushClearToOthers(any(), any());
+    doNothing().when(distributedRegion).lockLocallyForClear(any(), any(), any());
+
+    // doNothing when invoking unlocking methods for clear
+    doNothing().when(distributedRegion).distributedClearOperationReleaseLocks(any(), any());
+    doNothing().when(distributedRegion).releaseLockLocallyForClear(any());
+
+    return distributedRegion;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java
new file mode 100644
index 0000000..4e67fc1
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.Collections.emptySet;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType;
+
+public class PartitionedRegionClearMessageTest {
+
+  private Collection<InternalDistributedMember> recipients;
+  private DistributionManager distributionManager;
+  private PartitionedRegion partitionedRegion;
+  private ReplyProcessor21 replyProcessor21;
+  private Object callbackArgument;
+  private EventID eventId;
+  private RegionEventFactory regionEventFactory;
+
+  @Before
+  public void setUp() {
+    recipients = emptySet();
+    distributionManager = mock(DistributionManager.class);
+    partitionedRegion = mock(PartitionedRegion.class);
+    replyProcessor21 = mock(ReplyProcessor21.class);
+    callbackArgument = new Object();
+    eventId = mock(EventID.class);
+    regionEventFactory = mock(RegionEventFactory.class);
+  }
+
+  @Test
+  public void construction_throwsNullPointerExceptionIfRecipientsIsNull() {
+    Throwable thrown = catchThrowable(() -> {
+      new PartitionedRegionClearMessage(null, distributionManager, 1,
+          replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+          regionEventFactory);
+    });
+
+    assertThat(thrown).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void construction_findsAllDependencies() {
+    boolean isTransactionDistributed = true;
+    int regionId = 10;
+    InternalCache cache = mock(InternalCache.class);
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    TXManagerImpl txManager = mock(TXManagerImpl.class);
+    when(cache.getTxManager()).thenReturn(txManager);
+    when(partitionedRegion.getCache()).thenReturn(cache);
+    when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager);
+    when(partitionedRegion.getPRId()).thenReturn(regionId);
+    when(regionEvent.getEventId()).thenReturn(eventId);
+    when(regionEvent.getRawCallbackArgument()).thenReturn(callbackArgument);
+    when(txManager.isDistributed()).thenReturn(isTransactionDistributed);
+
+    PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients,
+        partitionedRegion,
+        replyProcessor21,
+        OperationType.OP_PR_CLEAR,
+        regionEvent);
+
+    assertThat(message.getDistributionManagerForTesting()).isSameAs(distributionManager);
+    assertThat(message.getCallbackArgumentForTesting()).isSameAs(callbackArgument);
+    assertThat(message.getRegionId()).isEqualTo(regionId);
+    assertThat(message.getEventID()).isEqualTo(eventId);
+    assertThat(message.isTransactionDistributed()).isEqualTo(isTransactionDistributed);
+
+    RegionEventFactory regionEventFactory = message.getRegionEventFactoryForTesting();
+    RegionEvent<?, ?> created =
+        regionEventFactory.create(partitionedRegion, Operation.DESTROY, callbackArgument, false,
+            mock(DistributedMember.class), mock(EventID.class));
+    assertThat(created).isInstanceOf(RegionEventImpl.class);
+  }
+
+  @Test
+  public void construction_setsTransactionDistributed() {
+    boolean isTransactionDistributed = true;
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId,
+            isTransactionDistributed, regionEventFactory);
+
+    boolean value = message.isTransactionDistributed();
+
+    assertThat(value).isEqualTo(isTransactionDistributed);
+  }
+
+  @Test
+  public void getEventID_returnsTheEventId() {
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+            regionEventFactory);
+
+    EventID value = message.getEventID();
+
+    assertThat(value).isSameAs(eventId);
+  }
+
+  @Test
+  public void getOperationType_returnsTheOperationType() {
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+            regionEventFactory);
+
+    OperationType value = message.getOperationType();
+
+    assertThat(value).isSameAs(OperationType.OP_PR_CLEAR);
+  }
+
+  @Test
+  public void send_putsOutgoing() {
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+            regionEventFactory);
+
+    message.send();
+
+    verify(distributionManager).putOutgoing(message);
+  }
+
+  @Test
+  public void processCheckForPR_returnsForceReattemptException_whenRegionIsNotInitialized() {
+    DistributionAdvisor distributionAdvisor = mock(DistributionAdvisor.class);
+    when(distributionAdvisor.isInitialized()).thenReturn(false);
+    when(partitionedRegion.getDistributionAdvisor()).thenReturn(distributionAdvisor);
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+            regionEventFactory);
+
+    Throwable throwable = message.processCheckForPR(partitionedRegion, distributionManager);
+
+    assertThat(throwable)
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining("could not find partitioned region with Id");
+  }
+
+  @Test
+  public void processCheckForPR_returnsNull_whenRegionIsNull() {
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+            regionEventFactory);
+
+    Throwable throwable = message.processCheckForPR(null, distributionManager);
+
+    assertThat(throwable).isNull();
+  }
+
+  @Test
+  public void processCheckForPR_returnsNull_whenRegionIsInitialized() {
+    DistributionAdvisor distributionAdvisor = mock(DistributionAdvisor.class);
+    when(distributionAdvisor.isInitialized()).thenReturn(true);
+    when(partitionedRegion.getDistributionAdvisor()).thenReturn(distributionAdvisor);
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+            regionEventFactory);
+
+    Throwable throwable = message.processCheckForPR(null, distributionManager);
+
+    assertThat(throwable).isNull();
+  }
+
+  @Test
+  public void operateOnPartitionedRegion_returnsTrue_whenRegionIsNull() {
+    ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+    PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class);
+    when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear);
+    when(partitionedRegionClear.clearRegionLocal(any())).thenReturn(emptySet());
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+            regionEventFactory);
+
+    boolean result =
+        message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+    assertThat(result).isTrue();
+  }
+
+  @Test
+  public void operateOnPartitionedRegion_returnsTrue_whenRegionIsDestroyed() {
+    ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+    when(partitionedRegion.isDestroyed()).thenReturn(true);
+    PartitionedRegionClearMessage message =
+        new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+            replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+            regionEventFactory);
+
+    boolean result =
+        message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+    assertThat(result).isTrue();
+  }
+
+  @Test
+  public void operateOnPartitionedRegion_obtainsClearLockLocal_whenOperationTypeIs_OP_LOCK_FOR_PR_CLEAR() {
+    ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+    PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class);
+    when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear);
+    PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients,
+        clusterDistributionManager, 1, replyProcessor21,
+        OperationType.OP_LOCK_FOR_PR_CLEAR, callbackArgument, eventId, false,
+        regionEventFactory);
+
+    boolean result =
+        message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+    assertThat(result).isTrue();
+    verify(partitionedRegionClear).obtainClearLockLocal(any());
+  }
+
+  @Test
+  public void operateOnPartitionedRegion_releasesClearLockLocal_whenOperationTypeIs_OP_UNLOCK_FOR_PR_CLEAR() {
+    ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+    PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class);
+    when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear);
+    PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients,
+        clusterDistributionManager, 1, replyProcessor21,
+        OperationType.OP_UNLOCK_FOR_PR_CLEAR, callbackArgument, eventId, false,
+        regionEventFactory);
+
+    boolean result =
+        message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+    assertThat(result).isTrue();
+    verify(partitionedRegionClear).releaseClearLockLocal();
+  }
+
+  @Test
+  public void operateOnPartitionedRegion_clearsRegionLocal_whenOperationTypeIs_OP_PR_CLEAR() {
+    ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+    PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class);
+    when(partitionedRegion.getPartitionedRegionClear())
+        .thenReturn(partitionedRegionClear);
+    when(regionEventFactory.create(any(), any(), any(), anyBoolean(), any(), any()))
+        .thenReturn(mock(RegionEventImpl.class));
+    PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients,
+        clusterDistributionManager, 1, replyProcessor21,
+        OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+        regionEventFactory);
+
+    boolean result =
+        message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+    assertThat(result).isTrue();
+    verify(partitionedRegionClear).clearRegionLocal(any(RegionEventImpl.class));
+  }
+}