You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/03/02 07:49:38 UTC

[geode] branch feature/GEODE-7682-2 created (now 25177d3)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-7682-2
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 25177d3  GEODE-7682: add PR.clear API

This branch includes the following new commits:

     new 25177d3  GEODE-7682: add PR.clear API

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-7682: add PR.clear API

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7682-2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 25177d3968f6140df3521d844769006da6fb795e
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Sun Mar 1 23:45:31 2020 -0800

    GEODE-7682: add PR.clear API
---
 .../cache/PartitionedRegionClearDUnitTest.java     | 133 ++++++++++++++
 ...itionedRegionSingleNodeOperationsJUnitTest.java |   7 +-
 .../codeAnalysis/sanctionedDataSerializables.txt   |   6 +-
 .../org/apache/geode/internal/DSFIDFactory.java    |   3 +
 .../geode/internal/cache/DistributedRegion.java    |   9 -
 .../apache/geode/internal/cache/LocalRegion.java   |  10 +
 .../geode/internal/cache/PartitionedRegion.java    | 203 ++++++++++++++++++++-
 .../geode/internal/cache/RegionEventImpl.java      |   5 +
 .../internal/cache/partitioned/ClearPRMessage.java |  76 ++++----
 .../cache/partitioned/ClearPRMessageTest.java      |  60 ++----
 10 files changed, 398 insertions(+), 114 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
new file mode 100644
index 0000000..fd1a10b
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+
+public class PartitionedRegionClearDUnitTest implements Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 1000;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, accessor;
+  protected ClientVM client1, client2;
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(6);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, locatorPort);
+    dataStore2 = cluster.startServerVM(2, locatorPort);
+    accessor = cluster.startServerVM(3, locatorPort);
+    client1 = cluster.startClientVM(4, c -> c.withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(5, c -> c.withLocatorConnection((locatorPort)));
+
+    prepare();
+  }
+
+  private void prepare() {
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+
+    accessor.invoke(this::insertEntries);
+    dataStore1.invoke(this::verifyDataIsLoaded);
+    dataStore2.invoke(this::verifyDataIsLoaded);
+  }
+
+  private void verifyDataIsLoaded() {
+    Region region = getCache().getRegion(REGION_NAME);
+    assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+  }
+
+  public Cache getCache() {
+    Cache cache = ClusterStartupRule.getCache();
+    assertThat(cache).isNotNull();
+
+    return cache;
+  }
+
+  private void initDataStore() {
+    Cache cache = getCache();
+    cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
+        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    Cache cache = getCache();
+    cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
+        .setPartitionAttributes(
+            new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+        .create(REGION_NAME);
+  }
+
+  private void insertEntries() {
+    Cache cache = getCache();
+    Region region = cache.getRegion(REGION_NAME);
+    IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+    assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+  }
+
+  @Test
+  public void normalClearFromDataStore() {
+    dataStore1.invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+
+      region.clear();
+      assertThat(region.size()).isEqualTo(0);
+    });
+    dataStore2.invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      assertThat(region.size()).isEqualTo(0);
+    });
+  }
+
+  @Test
+  public void normalClearFromAccessor() {
+    accessor.invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+      region.clear();
+
+      assertThat(region.size()).isEqualTo(0);
+    });
+    dataStore2.invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      assertThat(region.size()).isEqualTo(0);
+    });
+  }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
index e8de2b5..efda321 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
@@ -1309,12 +1309,7 @@ public class PartitionedRegionSingleNodeOperationsJUnitTest {
       pr.put(new Integer(3), "three");
       pr.getEntry("key");
 
-      try {
-        pr.clear();
-        fail(
-            "PartitionedRegionSingleNodeOperationTest:testUnSupportedOps() operation failed on a blank PartitionedRegion");
-      } catch (UnsupportedOperationException expected) {
-      }
+      pr.clear();
 
       // try {
       // pr.entries(true);
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index d6806f2..cacc46b 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1436,8 +1436,8 @@ fromData,27
 toData,27
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
-fromData,30
-toData,44
+fromData,19
+toData,36
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
 fromData,17
@@ -2101,4 +2101,4 @@ toData,105
 
 org/apache/geode/pdx/internal/PdxType,2
 fromData,109
-toData,124
\ No newline at end of file
+toData,124
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 81ee359..36d4bd5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -284,6 +284,7 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
 import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage;
 import org.apache.geode.internal.cache.partitioned.CreateBucketMessage;
@@ -972,6 +973,8 @@ public class DSFIDFactory implements DataSerializableFixedID {
     serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
         GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
     serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
+    serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class);
+    serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 1a62919..900d85e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -189,10 +189,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   @MutableForTesting
   public static boolean ignoreReconnect = false;
 
-  /**
-   * Lock to prevent multiple threads on this member from performing a clear at the same time.
-   */
-  private final Object clearLock = new Object();
   private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true);
 
   @MakeNotStatic
@@ -927,11 +923,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     }
   }
 
-  private void lockCheckReadiness() {
-    cache.getCancelCriterion().checkCancelInProgress(null);
-    checkReadiness();
-  }
-
   @Override
   Object validatedDestroy(Object key, EntryEventImpl event)
       throws TimeoutException, EntryNotFoundException, CacheWriterException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 6a7e2d2..d5f9156 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -473,6 +473,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   private final Lock clientMetaDataLock = new ReentrantLock();
 
   /**
+   * Lock to prevent multiple threads on this member from performing a clear at the same time.
+   */
+  protected final Object clearLock = new Object();
+
+  /**
    * Lock for updating the cache service profile for the region.
    */
   private final Lock cacheServiceProfileUpdateLock = new ReentrantLock();
@@ -2750,6 +2755,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     checkRegionDestroyed(true);
   }
 
+  protected void lockCheckReadiness() {
+    cache.getCancelCriterion().checkCancelInProgress(null);
+    checkReadiness();
+  }
+
   /**
    * This method should be called when the caller cannot locate an entry and that condition is
    * unexpected. This will first double check the cache and region state before throwing an
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 2c1ec04..c867225 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -179,6 +179,7 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
 import org.apache.geode.internal.cache.partitioned.DestroyMessage;
@@ -2150,12 +2151,200 @@ public class PartitionedRegion extends LocalRegion
    */
   @Override
   public void clear() {
-    throw new UnsupportedOperationException();
+    super.clear();
   }
 
   @Override
   void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
-    throw new UnsupportedOperationException();
+    synchronized (clearLock) {
+      final DistributedLockService lockService = getPartitionedRegionLockService();
+      try {
+        lockService.lock("_clearOperation", -1, -1);
+      } catch (IllegalStateException e) {
+        lockCheckReadiness();
+      }
+      try {
+        if (cache.isCacheAtShutdownAll()) {
+          throw cache.getCacheClosedException("Cache is shutting down");
+        }
+
+        // create ClearPRMessage per bucket
+        ArrayList<ClearPRMessage> clearMsgList =
+            (ArrayList<ClearPRMessage>) createClearPRMessages();
+        for (ClearPRMessage clearPRMessage : clearMsgList) {
+          int bucketId = clearPRMessage.getBucketId();
+          checkReadiness();
+          long then = 0;
+          try {
+            sendClearMsgByBucket(bucketId, clearPRMessage);
+          } catch (PartitionOfflineException poe) {
+            // TODO add a PartialResultException
+            logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket "
+                + bucketId, poe);
+          } catch (Exception e) {
+            logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e);
+          }
+
+          if (logger.isDebugEnabled()) {
+            long now = System.currentTimeMillis();
+            if (now - then > 10000) {
+              logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId,
+                  (now - then));
+            }
+          }
+          // TODO add psStats
+        }
+      } finally {
+        try {
+          lockService.unlock("_clearOperation");
+        } catch (IllegalStateException e) {
+          lockCheckReadiness();
+        }
+      }
+
+      // notify bridge clients at PR level
+      notifyBridgeClients(regionEvent);
+    }
+  }
+
+  void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) {
+    RetryTimeKeeper retryTime = null;
+    InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null);
+    if (logger.isDebugEnabled()) {
+      logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId,
+          currentTarget);
+    }
+
+    long timeOut = 0;
+    int count = 0;
+    for (;;) {
+      switch (count) {
+        case 0:
+          // Note we don't check for DM cancellation in common case.
+          // First time. Assume success, keep going.
+          break;
+        case 1:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // Second time (first failure). Calculate timeout and keep going.
+          timeOut = System.currentTimeMillis() + this.retryTimeout;
+          break;
+        default:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // test for timeout
+          long timeLeft = timeOut - System.currentTimeMillis();
+          if (timeLeft < 0) {
+            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId,
+                this.retryTimeout);
+            // NOTREACHED
+          }
+
+          // Didn't time out. Sleep a bit and then continue
+          boolean interrupted = Thread.interrupted();
+          try {
+            Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
+          } catch (InterruptedException ignore) {
+            interrupted = true;
+          } finally {
+            if (interrupted) {
+              Thread.currentThread().interrupt();
+            }
+          }
+          break;
+      } // switch
+      count++;
+
+      if (currentTarget == null) { // pick target
+        checkReadiness();
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+
+        currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false);
+        if (currentTarget == null) {
+          // the bucket does not exist, no need to clear
+          logger.info("Bucket " + bucketId + " does not contain data, no need to clear");
+          return;
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget);
+          }
+        }
+
+        // It's possible this is a GemFire thread e.g. ServerConnection
+        // which got to this point because of a distributed system shutdown or
+        // region closure which uses interrupt to break any sleep() or wait() calls
+        // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception
+        checkShutdown();
+        continue;
+      } // pick target
+
+      boolean result = false;
+      try {
+        final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId());
+        if (isLocal) {
+          result = clearPRMessage.doLocalClear(this, bucketId);
+        } else {
+          ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this);
+          if (response != null) {
+            this.prStats.incPartitionMessagesSent();
+            result = response.waitForResult();
+          }
+        }
+        if (result) {
+          return;
+        }
+      } catch (ForceReattemptException prce) {
+        checkReadiness();
+        InternalDistributedMember lastTarget = currentTarget;
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+        currentTarget = getNodeForBucketWrite(bucketId, retryTime);
+        if (logger.isDebugEnabled()) {
+          logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying {}", lastTarget,
+              currentTarget);
+        }
+        if (lastTarget.equals(currentTarget)) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}",
+                currentTarget, prce.getMessage());
+          }
+          if (retryTime.overMaximum()) {
+            PRHARedundancyProvider.timedOut(this, null, null, "update an entry",
+                this.retryTimeout);
+            // NOTREACHED
+          }
+          retryTime.waitToRetryNode();
+        }
+      }
+
+      // It's possible this is a GemFire thread e.g. ServerConnection
+      // which got to this point because of a distributed system shutdown or
+      // region closure which uses interrupt to break any sleep() or wait()
+      // calls
+      // e.g. waitForPrimary or waitForBucketRecovery in which case throw
+      // exception
+      checkShutdown();
+
+      // If we get here, the attempt failed...
+      if (count == 1) {
+        // TODO prStats add ClearPRMsg retried
+        this.prStats.incPutAllMsgsRetried();
+      }
+    }
+  }
+
+  List createClearPRMessages() {
+    if (cache.isCacheAtShutdownAll()) {
+      throw cache.getCacheClosedException("Cache is shutting down");
+    }
+
+    ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
+    for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) {
+      ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId);
+      clearMsgList.add(clearPRMessage);
+    }
+    return clearMsgList;
   }
 
   @Override
@@ -2574,7 +2763,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
           if (isDebugEnabled) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2715,7 +2904,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
           if (logger.isDebugEnabled()) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2960,7 +3149,7 @@ public class PartitionedRegion extends LocalRegion
         if (retryTime == null) {
           retryTime = new RetryTimeKeeper(this.retryTimeout);
         }
-        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
 
         // It's possible this is a GemFire thread e.g. ServerConnection
         // which got to this point because of a distributed system shutdown or
@@ -3122,7 +3311,7 @@ public class PartitionedRegion extends LocalRegion
    * @return a Node which contains the bucket, potentially null
    */
   private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper retryTime,
-      EntryEventImpl event, Integer bucketId) {
+      EntryEventImpl event, Integer bucketId, boolean createIfNotExist) {
     InternalDistributedMember newNode;
     if (retryTime.overMaximum()) {
       PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket",
@@ -3132,7 +3321,7 @@ public class PartitionedRegion extends LocalRegion
 
     retryTime.waitForBucketsRecovery();
     newNode = getNodeForBucketWrite(bucketId, retryTime);
-    if (newNode == null) {
+    if (newNode == null && createIfNotExist) {
       newNode = createBucket(bucketId, getEntrySize(event), retryTime);
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
index 402b7f2..f155a7e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
@@ -119,6 +119,11 @@ public class RegionEventImpl
     return region;
   }
 
+  public void setRegion(LocalRegion region) {
+    this.region = region;
+    this.distributedMember = region.getMyId();
+  }
+
   @Override
   public Operation getOperation() {
     return this.op;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
index 1a8aba1..15b281b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
@@ -54,11 +55,11 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
 public class ClearPRMessage extends PartitionMessageWithDirectReply {
   private static final Logger logger = LogService.getLogger();
 
-  private RegionEventImpl regionEvent;
-
   private Integer bucketId;
 
-  /** The time in ms to wait for a lock to be obtained during doLocalClear() */
+  /**
+   * The time in ms to wait for a lock to be obtained during doLocalClear()
+   */
   public static final int LOCK_WAIT_TIMEOUT_MS = 1000;
   public static final String BUCKET_NON_PRIMARY_MESSAGE =
       "The bucket region on target member is no longer primary";
@@ -85,10 +86,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     this.posDup = false;
   }
 
-  public void setRegionEvent(RegionEventImpl event) {
-    regionEvent = event;
-  }
-
   public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
       DirectReplyProcessor replyProcessor) {
     this.resetRecipients();
@@ -109,14 +106,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     return true;
   }
 
-  public RegionEventImpl getRegionEvent() {
-    return regionEvent;
-  }
-
   public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
       throws ForceReattemptException {
-    Set<InternalDistributedMember> recipients =
-        Collections.singleton((InternalDistributedMember) recipient);
+    Set recipients = Collections.singleton(recipient);
     ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients);
     initMessage(region, recipients, clearResponse);
     if (logger.isDebugEnabled()) {
@@ -143,7 +135,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     } else {
       InternalDataSerializer.writeSignedVL(bucketId, out);
     }
-    DataSerializer.writeObject(regionEvent, out);
   }
 
   @Override
@@ -151,12 +142,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
     this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
-    this.regionEvent = DataSerializer.readObject(in);
   }
 
   @Override
   public EventID getEventID() {
-    return regionEvent.getEventId();
+    return null;
   }
 
   /**
@@ -169,7 +159,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
   protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
       PartitionedRegion region, long startTime) {
     try {
-      result = doLocalClear(region);
+      result = doLocalClear(region, getBucketId());
     } catch (ForceReattemptException ex) {
       sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
           startTime);
@@ -179,39 +169,31 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     return false;
   }
 
-  public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException {
+  public int getBucketId() {
+    return this.bucketId;
+  }
+
+  public boolean doLocalClear(PartitionedRegion region, int bucketId)
+      throws ForceReattemptException {
     // Retrieve local bucket region which matches target bucketId
-    BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId);
+    BucketRegion bucketRegion =
+        region.getDataStore().getInitializedBucketForId(null, this.bucketId);
 
+    boolean lockedForPrimary = bucketRegion.doLockForPrimary(false);
     // Check if we are primary, throw exception if not
-    if (!bucketRegion.isPrimary()) {
+    if (!lockedForPrimary) {
       throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
     }
-
-    DistributedLockService lockService = getPartitionRegionLockService();
-    String lockName = bucketRegion.getFullPath();
     try {
-      boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1);
-
-      if (!locked) {
-        throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
-      }
-
-      // Double check if we are still primary, as this could have changed between our first check
-      // and obtaining the lock
-      if (!bucketRegion.isPrimary()) {
-        throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
-      }
-
-      try {
-        bucketRegion.cmnClearRegion(regionEvent, true, true);
-      } catch (Exception ex) {
-        throw new ForceReattemptException(
-            EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
-      }
-
+      RegionEventImpl regionEvent = new RegionEventImpl();
+      regionEvent.setOperation(Operation.REGION_CLEAR);
+      regionEvent.setRegion(bucketRegion);
+      bucketRegion.cmnClearRegion(regionEvent, true, true);
+    } catch (Exception ex) {
+      throw new ForceReattemptException(
+          EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
     } finally {
-      lockService.unlock(lockName);
+      bucketRegion.doUnlockForPrimary();
     }
 
     return true;
@@ -277,7 +259,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
   }
 
   public static class ClearReplyMessage extends ReplyMessage {
-    /** Result of the Clear operation */
+    /**
+     * Result of the Clear operation
+     */
     boolean result;
 
     @Override
@@ -298,7 +282,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
       setException(ex);
     }
 
-    /** Send an ack */
+    /**
+     * Send an ack
+     */
     public static void send(InternalDistributedMember recipient, int processorId,
         ReplySender replySender,
         boolean result, ReplyException ex) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
index 2cf5231..3568ff0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doNothing;
@@ -38,7 +37,6 @@ import java.util.Set;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -50,6 +48,7 @@ import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.PartitionedRegionStats;
+import org.apache.geode.internal.cache.RegionEventImpl;
 
 public class ClearPRMessageTest {
 
@@ -61,64 +60,43 @@ public class ClearPRMessageTest {
   @Before
   public void setup() throws ForceReattemptException {
     message = spy(new ClearPRMessage());
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
     region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
     dataStore = mock(PartitionedRegionDataStore.class);
     when(region.getDataStore()).thenReturn(dataStore);
+    when(region.getFullPath()).thenReturn("/test");
     bucketRegion = mock(BucketRegion.class);
     when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
+    RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class);
+    // RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null, false, member, true);
   }
 
   @Test
   public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() {
     when(bucketRegion.isPrimary()).thenReturn(false);
 
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 0))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
   }
 
   @Test
   public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(false);
 
-    when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false);
-    when(bucketRegion.isPrimary()).thenReturn(true);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
-        .isInstanceOf(ForceReattemptException.class)
-        .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
-  }
-
-  @Test
-  public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 0))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-    // Confirm that we actually obtained and released the lock
-    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
-    verify(mockLockService, times(1)).unlock(any());
   }
 
   @Test
   public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
     NullPointerException exception = new NullPointerException("Error encountered");
     doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
 
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
 
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, bucketRegion.getId()))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION);
 
@@ -129,21 +107,13 @@ public class ClearPRMessageTest {
   @Test
   public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
       throws ForceReattemptException {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
 
     // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
-    assertThat(message.doLocalClear(region)).isTrue();
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
+    assertThat(message.doLocalClear(region, 0)).isTrue();
 
     // Confirm that cmnClearRegion was called
     verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
-
-    // Confirm that we actually obtained and released the lock
-    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
-    verify(mockLockService, times(1)).unlock(any());
   }
 
   @Test
@@ -197,7 +167,8 @@ public class ClearPRMessageTest {
     int processorId = 1000;
     int startTime = 0;
 
-    doReturn(true).when(message).doLocalClear(region);
+    doReturn(0).when(message).getBucketId();
+    doReturn(true).when(message).doLocalClear(region, 0);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();
 
@@ -222,7 +193,8 @@ public class ClearPRMessageTest {
     ForceReattemptException exception =
         new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
 
-    doThrow(exception).when(message).doLocalClear(region);
+    doReturn(0).when(message).getBucketId();
+    doThrow(exception).when(message).doLocalClear(region, 0);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();