You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/14 18:22:17 UTC

[geode] 02/22: GEODE-7682: add PR.clear API (#4755)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 04df003f548e3c7d8006872d7cfba97e61359a28
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Thu Mar 5 23:46:36 2020 -0800

    GEODE-7682: add PR.clear  API (#4755)
    
    * GEODE-7683: introduce BR.cmnClearRegion
    
    Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
---
 .../cache/PartitionedRegionClearDUnitTest.java     | 218 +++++++++++++++++++++
 .../PartitionedRegionPersistentClearDUnitTest.java |  26 +++
 ...itionedRegionSingleNodeOperationsJUnitTest.java |  66 -------
 .../codeAnalysis/sanctionedDataSerializables.txt   |   4 +-
 .../org/apache/geode/internal/DSFIDFactory.java    |   3 +
 .../geode/internal/cache/DistributedRegion.java    |   9 -
 .../apache/geode/internal/cache/LocalRegion.java   |  10 +
 .../geode/internal/cache/PartitionedRegion.java    | 214 ++++++++++++++++++--
 .../geode/internal/cache/RegionEventImpl.java      |   5 +
 .../internal/cache/partitioned/ClearPRMessage.java | 166 +++++-----------
 .../cache/partitioned/ClearPRMessageTest.java      |  50 ++---
 11 files changed, 522 insertions(+), 249 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
new file mode 100644
index 0000000..fb2a81b
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionClearDUnitTest implements Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 1000;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+  protected ClientVM client1, client2;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+    dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+    dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+    accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+    client1 = cluster.startClientVM(5,
+        c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(6,
+        c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    dataStore3.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+  }
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT;
+  }
+
+  protected Properties getProperties() {
+    Properties properties = new Properties();
+    properties.setProperty("log-level", "info");
+    return properties;
+  }
+
+  private Region getRegion(boolean isClient) {
+    if (isClient) {
+      return getClientCache().getRegion(REGION_NAME);
+    } else {
+      return getCache().getRegion(REGION_NAME);
+    }
+  }
+
+  private void verifyRegionSize(boolean isClient, int expectedNum) {
+    assertThat(getRegion(isClient).size()).isEqualTo(expectedNum);
+  }
+
+  private void initClientCache() {
+    Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .create(REGION_NAME);
+    region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+  }
+
+  private void initDataStore() {
+    getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionShortcut shortcut = getRegionShortCut();
+    if (shortcut.isPersistent()) {
+      if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION;
+      } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_OVERFLOW;
+      } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT;
+      } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+      } else {
+        fail("Wrong region type:" + shortcut);
+      }
+    }
+    getCache().createRegionFactory(shortcut)
+        .setPartitionAttributes(
+            new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void feed(boolean isClient) {
+    Region region = getRegion(isClient);
+    IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+  }
+
+  private void verifyServerRegionSize(int expectedNum) {
+    accessor.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore1.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore2.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore3.invoke(() -> verifyRegionSize(false, expectedNum));
+  }
+
+  private void verifyClientRegionSize(int expectedNum) {
+    client1.invoke(() -> verifyRegionSize(true, expectedNum));
+    // TODO: notify register clients
+    // client2.invoke(()->verifyRegionSize(true, expectedNum));
+  }
+
+  private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
+    SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
+      CountingCacheListener countingCacheListener =
+          (CountingCacheListener) getRegion(false).getAttributes()
+              .getCacheListeners()[0];
+      return countingCacheListener.getClears();
+    };
+
+    int count = accessor.invoke(getListenerTriggerCount)
+        + dataStore1.invoke(getListenerTriggerCount)
+        + dataStore2.invoke(getListenerTriggerCount)
+        + dataStore3.invoke(getListenerTriggerCount);
+    assertThat(count).isEqualTo(1);
+
+    if (serverVM != null) {
+      assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
+    }
+  }
+
+  @Test
+  public void normalClearFromDataStore() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    dataStore1.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test
+  public void normalClearFromAccessor() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    accessor.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(accessor);
+  }
+
+  @Test
+  public void normalClearFromClient() {
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  private static class CountingCacheListener extends CacheListenerAdapter {
+    private final AtomicInteger clears = new AtomicInteger();
+
+    @Override
+    public void afterRegionClear(RegionEvent event) {
+      Region region = event.getRegion();
+      logger.info("Region " + region.getFullPath() + " is cleared.");
+      clears.incrementAndGet();
+    }
+
+    int getClears() {
+      return clears.get();
+    }
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
new file mode 100644
index 0000000..847699b
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+
+import org.apache.geode.cache.RegionShortcut;
+
+public class PartitionedRegionPersistentClearDUnitTest extends PartitionedRegionClearDUnitTest {
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
+  }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
index b37945b..4f36060 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -1298,71 +1297,6 @@ public class PartitionedRegionSingleNodeOperationsJUnitTest {
     }
   }
 
-  @Test
-  public void test023UnsupportedOps() throws Exception {
-    Region pr = null;
-    try {
-      pr = PartitionedRegionTestHelper.createPartitionedRegion("testUnsupportedOps",
-          String.valueOf(200), 0);
-
-      pr.put(new Integer(1), "one");
-      pr.put(new Integer(2), "two");
-      pr.put(new Integer(3), "three");
-      pr.getEntry("key");
-
-      try {
-        pr.clear();
-        fail(
-            "PartitionedRegionSingleNodeOperationTest:testUnSupportedOps() operation failed on a blank PartitionedRegion");
-      } catch (UnsupportedOperationException expected) {
-      }
-
-      // try {
-      // pr.entries(true);
-      // fail();
-      // }
-      // catch (UnsupportedOperationException expected) {
-      // }
-
-      // try {
-      // pr.entrySet(true);
-      // fail();
-      // }
-      // catch (UnsupportedOperationException expected) {
-      // }
-
-      try {
-        HashMap data = new HashMap();
-        data.put("foo", "bar");
-        data.put("bing", "bam");
-        data.put("supper", "hero");
-        pr.putAll(data);
-        // fail("testPutAll() does NOT throw UnsupportedOperationException");
-      } catch (UnsupportedOperationException onse) {
-      }
-
-
-      // try {
-      // pr.values();
-      // fail("testValues() does NOT throw UnsupportedOperationException");
-      // }
-      // catch (UnsupportedOperationException expected) {
-      // }
-
-
-      try {
-        pr.containsValue("foo");
-      } catch (UnsupportedOperationException ex) {
-        fail("PartitionedRegionSingleNodeOperationTest:testContainsValue() operation failed");
-      }
-
-    } finally {
-      if (pr != null) {
-        pr.destroyRegion();
-      }
-    }
-  }
-
   /**
    * This method validates size operations. It verifies that it returns correct size of the
    * PartitionedRegion.
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 8e522a2..fb83c84 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1377,8 +1377,8 @@ fromData,27
 toData,27
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
-fromData,30
-toData,44
+fromData,19
+toData,36
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
 fromData,17
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 504e7d1..26d92c9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -289,6 +289,7 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
 import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage;
 import org.apache.geode.internal.cache.partitioned.CreateBucketMessage;
@@ -985,6 +986,8 @@ public class DSFIDFactory implements DataSerializableFixedID {
     serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
         GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
     serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
+    serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class);
+    serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class);
     serializer.registerDSFID(HOST_AND_PORT, HostAndPort.class);
     serializer.registerDSFID(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 489d85a..84b5a3b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -192,10 +192,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   @MutableForTesting
   public static boolean ignoreReconnect = false;
 
-  /**
-   * Lock to prevent multiple threads on this member from performing a clear at the same time.
-   */
-  private final Object clearLock = new Object();
   private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true);
 
   @MakeNotStatic
@@ -933,11 +929,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     }
   }
 
-  private void lockCheckReadiness() {
-    cache.getCancelCriterion().checkCancelInProgress(null);
-    checkReadiness();
-  }
-
   @Override
   Object validatedDestroy(Object key, EntryEventImpl event)
       throws TimeoutException, EntryNotFoundException, CacheWriterException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 4236042..4268786 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -471,6 +471,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   private final Lock clientMetaDataLock = new ReentrantLock();
 
   /**
+   * Lock to prevent multiple threads on this member from performing a clear at the same time.
+   */
+  protected final Object clearLock = new Object();
+
+  /**
    * Lock for updating the cache service profile for the region.
    */
   private final Lock cacheServiceProfileUpdateLock = new ReentrantLock();
@@ -2748,6 +2753,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     checkRegionDestroyed(true);
   }
 
+  protected void lockCheckReadiness() {
+    cache.getCancelCriterion().checkCancelInProgress(null);
+    checkReadiness();
+  }
+
   /**
    * This method should be called when the caller cannot locate an entry and that condition is
    * unexpected. This will first double check the cache and region state before throwing an
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 8411a13..1aa427a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -181,6 +181,7 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
 import org.apache.geode.internal.cache.partitioned.DestroyMessage;
@@ -2173,18 +2174,202 @@ public class PartitionedRegion extends LocalRegion
     throw new UnsupportedOperationException();
   }
 
-  /**
-   * @since GemFire 5.0
-   * @throws UnsupportedOperationException OVERRIDES
-   */
   @Override
-  public void clear() {
-    throw new UnsupportedOperationException();
+  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    synchronized (clearLock) {
+      final DistributedLockService lockService = getPartitionedRegionLockService();
+      try {
+        lockService.lock("_clearOperation" + this.getFullPath().replace('/', '_'), -1, -1);
+      } catch (IllegalStateException e) {
+        lockCheckReadiness();
+        throw e;
+      }
+      try {
+        if (cache.isCacheAtShutdownAll()) {
+          throw cache.getCacheClosedException("Cache is shutting down");
+        }
+
+        // create ClearPRMessage per bucket
+        List<ClearPRMessage> clearMsgList = createClearPRMessages();
+        for (ClearPRMessage clearPRMessage : clearMsgList) {
+          int bucketId = clearPRMessage.getBucketId();
+          checkReadiness();
+          long sendMessagesStartTime = 0;
+          if (isDebugEnabled) {
+            sendMessagesStartTime = System.currentTimeMillis();
+          }
+          try {
+            sendClearMsgByBucket(bucketId, clearPRMessage);
+          } catch (PartitionOfflineException poe) {
+            // TODO add a PartialResultException
+            logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket "
+                + bucketId, poe);
+          } catch (Exception e) {
+            logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e);
+          }
+
+          if (isDebugEnabled) {
+            long now = System.currentTimeMillis();
+            logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId,
+                (now - sendMessagesStartTime));
+          }
+          // TODO add psStats
+        }
+      } finally {
+        try {
+          lockService.unlock("_clearOperation" + this.getFullPath().replace('/', '_'));
+        } catch (IllegalStateException e) {
+          lockCheckReadiness();
+        }
+      }
+
+      // notify bridge clients at PR level
+      regionEvent.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR);
+      boolean hasListener = hasListener();
+      if (hasListener) {
+        dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
+      }
+      notifyBridgeClients(regionEvent);
+      logger.info("Partitioned region {} finsihed clear operation.", this.getFullPath());
+    }
   }
 
-  @Override
-  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
-    throw new UnsupportedOperationException();
+  void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) {
+    RetryTimeKeeper retryTime = null;
+    InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null);
+    if (logger.isDebugEnabled()) {
+      logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId,
+          currentTarget);
+    }
+
+    long timeOut = 0;
+    int count = 0;
+    while (true) {
+      switch (count) {
+        case 0:
+          // Note we don't check for DM cancellation in common case.
+          // First time. Assume success, keep going.
+          break;
+        case 1:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // Second time (first failure). Calculate timeout and keep going.
+          timeOut = System.currentTimeMillis() + this.retryTimeout;
+          break;
+        default:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // test for timeout
+          long timeLeft = timeOut - System.currentTimeMillis();
+          if (timeLeft < 0) {
+            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId,
+                this.retryTimeout);
+            // NOTREACHED
+          }
+
+          // Didn't time out. Sleep a bit and then continue
+          boolean interrupted = Thread.interrupted();
+          try {
+            Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
+          } catch (InterruptedException ignore) {
+            interrupted = true;
+          } finally {
+            if (interrupted) {
+              Thread.currentThread().interrupt();
+            }
+          }
+          break;
+      } // switch
+      count++;
+
+      if (currentTarget == null) { // pick target
+        checkReadiness();
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+
+        currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false);
+        if (currentTarget == null) {
+          // the bucket does not exist, no need to clear
+          logger.info("Bucket " + bucketId + " does not contain data, no need to clear");
+          return;
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget);
+          }
+        }
+
+        // It's possible this is a GemFire thread e.g. ServerConnection
+        // which got to this point because of a distributed system shutdown or
+        // region closure which uses interrupt to break any sleep() or wait() calls
+        // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception
+        checkShutdown();
+        continue;
+      } // pick target
+
+      boolean result = false;
+      try {
+        final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId());
+        if (isLocal) {
+          result = clearPRMessage.doLocalClear(this);
+        } else {
+          ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this);
+          if (response != null) {
+            this.prStats.incPartitionMessagesSent();
+            result = response.waitForResult();
+          }
+        }
+        if (result) {
+          return;
+        }
+      } catch (ForceReattemptException fre) {
+        checkReadiness();
+        InternalDistributedMember lastTarget = currentTarget;
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+        currentTarget = getNodeForBucketWrite(bucketId, retryTime);
+        if (lastTarget.equals(currentTarget)) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}",
+                currentTarget, fre.getMessage());
+          }
+          if (retryTime.overMaximum()) {
+            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket",
+                this.retryTimeout);
+            // NOTREACHED
+          }
+          retryTime.waitToRetryNode();
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: Old target was {}, Retrying {}", lastTarget,
+                currentTarget);
+          }
+        }
+      }
+
+      // It's possible this is a GemFire thread e.g. ServerConnection
+      // which got to this point because of a distributed system shutdown or
+      // region closure which uses interrupt to break any sleep() or wait()
+      // calls
+      // e.g. waitForPrimary or waitForBucketRecovery in which case throw
+      // exception
+      checkShutdown();
+
+      // If we get here, the attempt failed...
+      if (count == 1) {
+        // TODO prStats add ClearPRMsg retried
+        this.prStats.incPutAllMsgsRetried();
+      }
+    }
+  }
+
+  List<ClearPRMessage> createClearPRMessages() {
+    ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
+    for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) {
+      ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId);
+      clearMsgList.add(clearPRMessage);
+    }
+    return clearMsgList;
   }
 
   @Override
@@ -2603,7 +2788,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
           if (isDebugEnabled) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2742,7 +2927,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
           if (logger.isDebugEnabled()) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2987,7 +3172,7 @@ public class PartitionedRegion extends LocalRegion
         if (retryTime == null) {
           retryTime = new RetryTimeKeeper(this.retryTimeout);
         }
-        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
 
         // It's possible this is a GemFire thread e.g. ServerConnection
         // which got to this point because of a distributed system shutdown or
@@ -3146,10 +3331,11 @@ public class PartitionedRegion extends LocalRegion
    * @param retryTime the RetryTimeKeeper to track retry times
    * @param event the event used to get the entry size in the event a new bucket should be created
    * @param bucketId the identity of the bucket should it be created
+   * @param createIfNotExist boolean to indicate if to create a bucket if found not exist
    * @return a Node which contains the bucket, potentially null
    */
   private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper retryTime,
-      EntryEventImpl event, Integer bucketId) {
+      EntryEventImpl event, Integer bucketId, boolean createIfNotExist) {
     InternalDistributedMember newNode;
     if (retryTime.overMaximum()) {
       PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket",
@@ -3159,7 +3345,7 @@ public class PartitionedRegion extends LocalRegion
 
     retryTime.waitForBucketsRecovery();
     newNode = getNodeForBucketWrite(bucketId, retryTime);
-    if (newNode == null) {
+    if (newNode == null && createIfNotExist) {
       newNode = createBucket(bucketId, getEntrySize(event), retryTime);
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
index fba513d..49dc932 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
@@ -119,6 +119,11 @@ public class RegionEventImpl
     return region;
   }
 
+  public void setRegion(LocalRegion region) {
+    this.region = region;
+    this.distributedMember = region.getMyId();
+  }
+
   @Override
   public Operation getOperation() {
     return this.op;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
index 1a8aba1..9fa8057 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -26,7 +26,8 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheException;
-import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
@@ -44,7 +45,6 @@ import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionEventImpl;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.serialization.DeserializationContext;
@@ -54,16 +54,10 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
 public class ClearPRMessage extends PartitionMessageWithDirectReply {
   private static final Logger logger = LogService.getLogger();
 
-  private RegionEventImpl regionEvent;
-
   private Integer bucketId;
 
-  /** The time in ms to wait for a lock to be obtained during doLocalClear() */
-  public static final int LOCK_WAIT_TIMEOUT_MS = 1000;
   public static final String BUCKET_NON_PRIMARY_MESSAGE =
       "The bucket region on target member is no longer primary";
-  public static final String BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE =
-      "A lock for the bucket region could not be obtained.";
   public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
       "An exception was thrown during the local clear operation: ";
 
@@ -79,14 +73,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
 
   public ClearPRMessage(int bucketId) {
     this.bucketId = bucketId;
-
-    // These are both used by the parent class, but don't apply to this message type
-    this.notificationOnly = false;
-    this.posDup = false;
-  }
-
-  public void setRegionEvent(RegionEventImpl event) {
-    regionEvent = event;
   }
 
   public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
@@ -103,16 +89,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     }
   }
 
-  @Override
-  public boolean isSevereAlertCompatible() {
-    // allow forced-disconnect processing for all cache op messages
-    return true;
-  }
-
-  public RegionEventImpl getRegionEvent() {
-    return regionEvent;
-  }
-
   public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
       throws ForceReattemptException {
     Set<InternalDistributedMember> recipients =
@@ -125,7 +101,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
 
     Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this);
     if (failures != null && failures.size() > 0) {
-      throw new ForceReattemptException("Failed sending <" + this + ">");
+      throw new ForceReattemptException("Failed sending <" + this + "> due to " + failures);
     }
     return clearResponse;
   }
@@ -143,7 +119,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     } else {
       InternalDataSerializer.writeSignedVL(bucketId, out);
     }
-    DataSerializer.writeObject(regionEvent, out);
   }
 
   @Override
@@ -151,12 +126,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
     this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
-    this.regionEvent = DataSerializer.readObject(in);
   }
 
   @Override
   public EventID getEventID() {
-    return regionEvent.getEventId();
+    return null;
   }
 
   /**
@@ -169,60 +143,51 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
   protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
       PartitionedRegion region, long startTime) {
     try {
-      result = doLocalClear(region);
+      this.result = doLocalClear(region);
     } catch (ForceReattemptException ex) {
       sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
           startTime);
       return false;
     }
-    sendReply(getSender(), getProcessorId(), distributionManager, null, region, startTime);
-    return false;
+    return this.result;
   }
 
-  public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException {
+  public Integer getBucketId() {
+    return this.bucketId;
+  }
+
+  public boolean doLocalClear(PartitionedRegion region)
+      throws ForceReattemptException {
     // Retrieve local bucket region which matches target bucketId
-    BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId);
+    BucketRegion bucketRegion =
+        region.getDataStore().getInitializedBucketForId(null, this.bucketId);
 
-    // Check if we are primary, throw exception if not
-    if (!bucketRegion.isPrimary()) {
+    boolean lockedForPrimary = bucketRegion.doLockForPrimary(false);
+    // Check if we obtained primary lock, throw exception if not
+    if (!lockedForPrimary) {
       throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
     }
-
-    DistributedLockService lockService = getPartitionRegionLockService();
-    String lockName = bucketRegion.getFullPath();
     try {
-      boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1);
-
-      if (!locked) {
-        throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
-      }
-
-      // Double check if we are still primary, as this could have changed between our first check
-      // and obtaining the lock
-      if (!bucketRegion.isPrimary()) {
-        throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
-      }
-
-      try {
-        bucketRegion.cmnClearRegion(regionEvent, true, true);
-      } catch (Exception ex) {
-        throw new ForceReattemptException(
-            EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
-      }
-
+      RegionEventImpl regionEvent = new RegionEventImpl();
+      regionEvent.setOperation(Operation.REGION_CLEAR);
+      regionEvent.setRegion(bucketRegion);
+      bucketRegion.cmnClearRegion(regionEvent, true, true);
+    } catch (PartitionOfflineException poe) {
+      logger.info(
+          "All members holding data for bucket {} are offline, no more retries will be attempted",
+          this.bucketId,
+          poe);
+      throw poe;
+    } catch (Exception ex) {
+      throw new ForceReattemptException(
+          EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
     } finally {
-      lockService.unlock(lockName);
+      bucketRegion.doUnlockForPrimary();
     }
 
     return true;
   }
 
-  // Extracted for testing
-  protected DistributedLockService getPartitionRegionLockService() {
-    return DistributedLockService
-        .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
-  }
-
   @Override
   public boolean canStartRemoteTransaction() {
     return false;
@@ -247,39 +212,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     buff.append("; bucketId=").append(this.bucketId);
   }
 
-  @Override
-  public String toString() {
-    StringBuilder buff = new StringBuilder();
-    String className = getClass().getName();
-    buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo>
-    buff.append("(prid="); // make sure this is the first one
-    buff.append(this.regionId);
-
-    // Append name, if we have it
-    String name = null;
-    try {
-      PartitionedRegion region = PartitionedRegion.getPRFromId(this.regionId);
-      if (region != null) {
-        name = region.getFullPath();
-      }
-    } catch (Exception ignore) {
-      /* ignored */
-    }
-    if (name != null) {
-      buff.append(" (name = \"").append(name).append("\")");
-    }
-
-    appendFields(buff);
-    buff.append(" ,distTx=");
-    buff.append(this.isTransactionDistributed);
-    buff.append(")");
-    return buff.toString();
-  }
-
   public static class ClearReplyMessage extends ReplyMessage {
-    /** Result of the Clear operation */
-    boolean result;
-
     @Override
     public boolean getInlineProcess() {
       return true;
@@ -293,16 +226,21 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
 
     private ClearReplyMessage(int processorId, boolean result, ReplyException ex) {
       super();
-      this.result = result;
       setProcessorId(processorId);
-      setException(ex);
+      if (ex != null) {
+        setException(ex);
+      } else {
+        setReturnValue(result);
+      }
     }
 
-    /** Send an ack */
+    /**
+     * Send an ack
+     */
     public static void send(InternalDistributedMember recipient, int processorId,
         ReplySender replySender,
         boolean result, ReplyException ex) {
-      Assert.assertTrue(recipient != null, "ClearReplyMessage NULL reply message");
+      Assert.assertNotNull(recipient, "ClearReplyMessage recipient was NULL.");
       ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex);
       message.setRecipient(recipient);
       replySender.putOutgoing(message);
@@ -340,23 +278,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     }
 
     @Override
-    public void fromData(DataInput in,
-        DeserializationContext context) throws IOException, ClassNotFoundException {
-      super.fromData(in, context);
-      this.result = in.readBoolean();
-    }
-
-    @Override
-    public void toData(DataOutput out,
-        SerializationContext context) throws IOException {
-      super.toData(out, context);
-      out.writeBoolean(this.result);
-    }
-
-    @Override
     public String toString() {
-      return "ClearReplyMessage " + "processorid=" + this.processorId + " returning " + this.result
-          + " exception=" + getException();
+      StringBuilder stringBuilder = new StringBuilder(super.toString());
+      stringBuilder.append(" returnValue=");
+      stringBuilder.append(getReturnValue());
+      return stringBuilder.toString();
     }
   }
 
@@ -372,7 +298,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     }
 
     public void setResponse(ClearReplyMessage response) {
-      this.returnValue = response.result;
+      if (response.getException() == null) {
+        this.returnValue = (boolean) response.getReturnValue();
+      }
     }
 
     /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
index 2cf5231..acdd4fc 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doNothing;
@@ -38,7 +37,6 @@ import java.util.Set;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -50,6 +48,7 @@ import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.PartitionedRegionStats;
+import org.apache.geode.internal.cache.RegionEventImpl;
 
 public class ClearPRMessageTest {
 
@@ -61,11 +60,14 @@ public class ClearPRMessageTest {
   @Before
   public void setup() throws ForceReattemptException {
     message = spy(new ClearPRMessage());
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
     region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
     dataStore = mock(PartitionedRegionDataStore.class);
     when(region.getDataStore()).thenReturn(dataStore);
+    when(region.getFullPath()).thenReturn("/test");
     bucketRegion = mock(BucketRegion.class);
     when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
+    RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class);
   }
 
   @Test
@@ -79,44 +81,19 @@ public class ClearPRMessageTest {
 
   @Test
   public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
-    when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false);
-    when(bucketRegion.isPrimary()).thenReturn(true);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
-        .isInstanceOf(ForceReattemptException.class)
-        .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
-  }
-
-  @Test
-  public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(false);
 
     assertThatThrownBy(() -> message.doLocalClear(region))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-    // Confirm that we actually obtained and released the lock
-    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
-    verify(mockLockService, times(1)).unlock(any());
   }
 
   @Test
   public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
     NullPointerException exception = new NullPointerException("Error encountered");
     doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
 
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
 
     assertThatThrownBy(() -> message.doLocalClear(region))
         .isInstanceOf(ForceReattemptException.class)
@@ -129,21 +106,13 @@ public class ClearPRMessageTest {
   @Test
   public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
       throws ForceReattemptException {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
 
     // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
     assertThat(message.doLocalClear(region)).isTrue();
 
     // Confirm that cmnClearRegion was called
     verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
-
-    // Confirm that we actually obtained and released the lock
-    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
-    verify(mockLockService, times(1)).unlock(any());
   }
 
   @Test
@@ -197,6 +166,7 @@ public class ClearPRMessageTest {
     int processorId = 1000;
     int startTime = 0;
 
+    doReturn(0).when(message).getBucketId();
     doReturn(true).when(message).doLocalClear(region);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();
@@ -206,8 +176,9 @@ public class ClearPRMessageTest {
     doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
 
     message.operateOnPartitionedRegion(distributionManager, region, startTime);
+    assertThat(message.result).isTrue();
 
-    verify(message, times(1)).sendReply(sender, processorId, distributionManager, null, region,
+    verify(message, times(0)).sendReply(sender, processorId, distributionManager, null, region,
         startTime);
   }
 
@@ -222,6 +193,7 @@ public class ClearPRMessageTest {
     ForceReattemptException exception =
         new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
 
+    doReturn(0).when(message).getBucketId();
     doThrow(exception).when(message).doLocalClear(region);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();