You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/02 19:28:47 UTC

[geode] 01/19: GEODE-7683: introduce BR.cmnClearRegion

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7ff925adb9d7def42c3e197b1c459efadf4bffb6
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Jan 27 17:02:48 2020 -0800

    GEODE-7683: introduce BR.cmnClearRegion
    
    Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
    
    GEODE-7684: Create messaging class for PR Clear (#4689)
    
    * Added new message class and test
    
    Co-authored-by: Benjamin Ross <br...@pivotal.io>
    Co-authored-by: Donal Evans <do...@pivotal.io>
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |   8 +
 .../apache/geode/internal/cache/BucketRegion.java  |  34 +-
 .../geode/internal/cache/DistributedRegion.java    |  23 +-
 .../internal/cache/partitioned/ClearPRMessage.java | 388 +++++++++++++++++++++
 .../internal/cache/BucketRegionJUnitTest.java      |  77 ++++
 .../internal/cache/DistributedRegionJUnitTest.java |  18 +
 .../cache/partitioned/ClearPRMessageTest.java      | 288 +++++++++++++++
 .../serialization/DataSerializableFixedID.java     |   3 +
 8 files changed, 831 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index cbc995d..a230c75 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1376,6 +1376,14 @@ org/apache/geode/internal/cache/partitioned/BucketSizeMessage$BucketSizeReplyMes
 fromData,27
 toData,27
 
+org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
+fromData,30
+toData,44
+
+org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
+fromData,17
+toData,17
+
 org/apache/geode/internal/cache/partitioned/ColocatedRegionDetails,2
 fromData,81
 toData,133
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 8662a6e..d49d3dc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -560,6 +560,36 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     }
   }
 
+  @Override
+  public void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
+    if (!getBucketAdvisor().isPrimary()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Not primary bucket when doing clear, do nothing");
+      }
+      return;
+    }
+
+    boolean enableRVV = useRVV && getConcurrencyChecksEnabled();
+    RegionVersionVector rvv = null;
+    if (enableRVV) {
+      rvv = getVersionVector().getCloneForTransmission();
+    }
+
+    // get rvvLock
+    Set<InternalDistributedMember> participants =
+        getCacheDistributionAdvisor().adviseInvalidateRegion();
+    try {
+      obtainWriteLocksForClear(regionEvent, participants);
+      // no need to dominate my own rvv.
+      // Clear is on going here, there won't be GII for this member
+      clearRegionLocally(regionEvent, cacheWrite, null);
+      distributeClearOperation(regionEvent, rvv, participants);
+
+      // TODO: call reindexUserDataRegion if there're lucene indexes
+    } finally {
+      releaseWriteLocksForClear(regionEvent, participants);
+    }
+  }
 
   long generateTailKey() {
     long key = eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets());
@@ -2110,8 +2140,8 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       // counters to 0.
       oldMemValue = bytesInMemory.getAndSet(0);
     } else {
-      throw new InternalGemFireError(
-          "Trying to clear a bucket region that was not destroyed or in initialization.");
+      // BucketRegion's clear is supported now
+      oldMemValue = bytesInMemory.getAndSet(0);
     }
     if (oldMemValue != BUCKET_DESTROYED) {
       partitionedRegion.getPrStats().incDataStoreEntryCount(-sizeBeforeClear);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index b822dde..489d85a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2013,6 +2013,10 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     super.basicClear(regionEvent, cacheWrite);
   }
 
+  void distributeClearOperation(RegionEventImpl regionEvent, RegionVersionVector rvv,
+      Set<InternalDistributedMember> participants) {
+    DistributedClearOperation.clear(regionEvent, rvv, participants);
+  }
 
   @Override
   void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
@@ -2035,7 +2039,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
             obtainWriteLocksForClear(regionEvent, participants);
             clearRegionLocally(regionEvent, cacheWrite, null);
             if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
-              DistributedClearOperation.clear(regionEvent, null, participants);
+              distributeClearOperation(regionEvent, null, participants);
             }
           } finally {
             releaseWriteLocksForClear(regionEvent, participants);
@@ -2091,10 +2095,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   /**
    * obtain locks preventing generation of new versions in other members
    */
-  private void obtainWriteLocksForClear(RegionEventImpl regionEvent,
+  protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
       Set<InternalDistributedMember> participants) {
     lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
-    DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+    if (!isUsedForPartitionedRegionBucket()) {
+      DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+    }
   }
 
   /**
@@ -2131,7 +2137,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   /**
    * releases the locks obtained in obtainWriteLocksForClear
    */
-  private void releaseWriteLocksForClear(RegionEventImpl regionEvent,
+  protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
       Set<InternalDistributedMember> participants) {
 
     ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
@@ -2139,8 +2145,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
       armLockTestHook.beforeRelease(this, regionEvent);
     }
 
-    getVersionVector().unlockForClear(getMyId());
-    DistributedClearOperation.releaseLocks(regionEvent, participants);
+    RegionVersionVector rvv = getVersionVector();
+    if (rvv != null) {
+      rvv.unlockForClear(getMyId());
+    }
+    if (!isUsedForPartitionedRegionBucket()) {
+      DistributedClearOperation.releaseLocks(regionEvent, participants);
+    }
 
     if (armLockTestHook != null) {
       armLockTestHook.afterRelease(this, regionEvent);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
new file mode 100644
index 0000000..1a8aba1
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.partitioned;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.ReplySender;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.RegionEventImpl;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPRMessage extends PartitionMessageWithDirectReply {
+  private static final Logger logger = LogService.getLogger();
+
+  private RegionEventImpl regionEvent;
+
+  private Integer bucketId;
+
+  /** The time in ms to wait for a lock to be obtained during doLocalClear() */
+  public static final int LOCK_WAIT_TIMEOUT_MS = 1000;
+  public static final String BUCKET_NON_PRIMARY_MESSAGE =
+      "The bucket region on target member is no longer primary";
+  public static final String BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE =
+      "A lock for the bucket region could not be obtained.";
+  public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
+      "An exception was thrown during the local clear operation: ";
+
+  /**
+   * state from operateOnRegion that must be preserved for transmission from the waiting pool
+   */
+  transient boolean result = false;
+
+  /**
+   * Empty constructor to satisfy {@link DataSerializer}requirements
+   */
+  public ClearPRMessage() {}
+
+  public ClearPRMessage(int bucketId) {
+    this.bucketId = bucketId;
+
+    // These are both used by the parent class, but don't apply to this message type
+    this.notificationOnly = false;
+    this.posDup = false;
+  }
+
+  public void setRegionEvent(RegionEventImpl event) {
+    regionEvent = event;
+  }
+
+  public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
+      DirectReplyProcessor replyProcessor) {
+    this.resetRecipients();
+    if (recipients != null) {
+      setRecipients(recipients);
+    }
+    this.regionId = region.getPRId();
+    this.processor = replyProcessor;
+    this.processorId = replyProcessor == null ? 0 : replyProcessor.getProcessorId();
+    if (replyProcessor != null) {
+      replyProcessor.enableSevereAlertProcessing();
+    }
+  }
+
+  @Override
+  public boolean isSevereAlertCompatible() {
+    // allow forced-disconnect processing for all cache op messages
+    return true;
+  }
+
+  public RegionEventImpl getRegionEvent() {
+    return regionEvent;
+  }
+
+  public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
+      throws ForceReattemptException {
+    Set<InternalDistributedMember> recipients =
+        Collections.singleton((InternalDistributedMember) recipient);
+    ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients);
+    initMessage(region, recipients, clearResponse);
+    if (logger.isDebugEnabled()) {
+      logger.debug("ClearPRMessage.send: recipient is {}, msg is {}", recipient, this);
+    }
+
+    Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this);
+    if (failures != null && failures.size() > 0) {
+      throw new ForceReattemptException("Failed sending <" + this + ">");
+    }
+    return clearResponse;
+  }
+
+  @Override
+  public int getDSFID() {
+    return PR_CLEAR_MESSAGE;
+  }
+
+  @Override
+  public void toData(DataOutput out, SerializationContext context) throws IOException {
+    super.toData(out, context);
+    if (bucketId == null) {
+      InternalDataSerializer.writeSignedVL(-1, out);
+    } else {
+      InternalDataSerializer.writeSignedVL(bucketId, out);
+    }
+    DataSerializer.writeObject(regionEvent, out);
+  }
+
+  @Override
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
+    super.fromData(in, context);
+    this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
+    this.regionEvent = DataSerializer.readObject(in);
+  }
+
+  @Override
+  public EventID getEventID() {
+    return regionEvent.getEventId();
+  }
+
+  /**
+   * This method is called upon receipt and make the desired changes to the PartitionedRegion Note:
+   * It is very important that this message does NOT cause any deadlocks as the sender will wait
+   * indefinitely for the acknowledgement
+   */
+  @Override
+  @VisibleForTesting
+  protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
+      PartitionedRegion region, long startTime) {
+    try {
+      result = doLocalClear(region);
+    } catch (ForceReattemptException ex) {
+      sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
+          startTime);
+      return false;
+    }
+    sendReply(getSender(), getProcessorId(), distributionManager, null, region, startTime);
+    return false;
+  }
+
+  public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException {
+    // Retrieve local bucket region which matches target bucketId
+    BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId);
+
+    // Check if we are primary, throw exception if not
+    if (!bucketRegion.isPrimary()) {
+      throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
+    }
+
+    DistributedLockService lockService = getPartitionRegionLockService();
+    String lockName = bucketRegion.getFullPath();
+    try {
+      boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1);
+
+      if (!locked) {
+        throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
+      }
+
+      // Double check if we are still primary, as this could have changed between our first check
+      // and obtaining the lock
+      if (!bucketRegion.isPrimary()) {
+        throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
+      }
+
+      try {
+        bucketRegion.cmnClearRegion(regionEvent, true, true);
+      } catch (Exception ex) {
+        throw new ForceReattemptException(
+            EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
+      }
+
+    } finally {
+      lockService.unlock(lockName);
+    }
+
+    return true;
+  }
+
+  // Extracted for testing
+  protected DistributedLockService getPartitionRegionLockService() {
+    return DistributedLockService
+        .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
+  }
+
+  @Override
+  public boolean canStartRemoteTransaction() {
+    return false;
+  }
+
+  @Override
+  protected void sendReply(InternalDistributedMember member, int processorId,
+      DistributionManager distributionManager, ReplyException ex,
+      PartitionedRegion partitionedRegion, long startTime) {
+    if (partitionedRegion != null) {
+      if (startTime > 0) {
+        partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+      }
+    }
+    ClearReplyMessage.send(member, processorId, getReplySender(distributionManager), this.result,
+        ex);
+  }
+
+  @Override
+  protected void appendFields(StringBuilder buff) {
+    super.appendFields(buff);
+    buff.append("; bucketId=").append(this.bucketId);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buff = new StringBuilder();
+    String className = getClass().getName();
+    buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo>
+    buff.append("(prid="); // make sure this is the first one
+    buff.append(this.regionId);
+
+    // Append name, if we have it
+    String name = null;
+    try {
+      PartitionedRegion region = PartitionedRegion.getPRFromId(this.regionId);
+      if (region != null) {
+        name = region.getFullPath();
+      }
+    } catch (Exception ignore) {
+      /* ignored */
+    }
+    if (name != null) {
+      buff.append(" (name = \"").append(name).append("\")");
+    }
+
+    appendFields(buff);
+    buff.append(" ,distTx=");
+    buff.append(this.isTransactionDistributed);
+    buff.append(")");
+    return buff.toString();
+  }
+
+  public static class ClearReplyMessage extends ReplyMessage {
+    /** Result of the Clear operation */
+    boolean result;
+
+    @Override
+    public boolean getInlineProcess() {
+      return true;
+    }
+
+    /**
+     * Empty constructor to conform to DataSerializable interface
+     */
+    @SuppressWarnings("unused")
+    public ClearReplyMessage() {}
+
+    private ClearReplyMessage(int processorId, boolean result, ReplyException ex) {
+      super();
+      this.result = result;
+      setProcessorId(processorId);
+      setException(ex);
+    }
+
+    /** Send an ack */
+    public static void send(InternalDistributedMember recipient, int processorId,
+        ReplySender replySender,
+        boolean result, ReplyException ex) {
+      Assert.assertTrue(recipient != null, "ClearReplyMessage NULL reply message");
+      ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex);
+      message.setRecipient(recipient);
+      replySender.putOutgoing(message);
+    }
+
+    /**
+     * Processes this message. This method is invoked by the receiver of the message.
+     *
+     * @param distributionManager the distribution manager that is processing the message.
+     */
+    @Override
+    public void process(final DistributionManager distributionManager,
+        final ReplyProcessor21 replyProcessor) {
+      final long startTime = getTimestamp();
+      if (replyProcessor == null) {
+        if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+          logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
+        }
+        return;
+      }
+      if (replyProcessor instanceof ClearResponse) {
+        ((ClearResponse) replyProcessor).setResponse(this);
+      }
+      replyProcessor.process(this);
+
+      if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+        logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", replyProcessor, this);
+      }
+      distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
+    }
+
+    @Override
+    public int getDSFID() {
+      return PR_CLEAR_REPLY_MESSAGE;
+    }
+
+    @Override
+    public void fromData(DataInput in,
+        DeserializationContext context) throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+      this.result = in.readBoolean();
+    }
+
+    @Override
+    public void toData(DataOutput out,
+        SerializationContext context) throws IOException {
+      super.toData(out, context);
+      out.writeBoolean(this.result);
+    }
+
+    @Override
+    public String toString() {
+      return "ClearReplyMessage " + "processorid=" + this.processorId + " returning " + this.result
+          + " exception=" + getException();
+    }
+  }
+
+  /**
+   * A processor to capture the value returned by {@link ClearPRMessage}
+   */
+  public static class ClearResponse extends PartitionResponse {
+    private volatile boolean returnValue;
+
+    public ClearResponse(InternalDistributedSystem distributedSystem,
+        Set<InternalDistributedMember> recipients) {
+      super(distributedSystem, recipients, false);
+    }
+
+    public void setResponse(ClearReplyMessage response) {
+      this.returnValue = response.result;
+    }
+
+    /**
+     * @return the result of the remote clear operation
+     * @throws ForceReattemptException if the peer is no longer available
+     * @throws CacheException if the peer generates an error
+     */
+    public boolean waitForResult() throws CacheException, ForceReattemptException {
+      waitForCacheException();
+      return this.returnValue;
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index 72e6657..c7cf5a6 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -14,7 +14,9 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.anyLong;
@@ -31,7 +33,10 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.junit.Test;
+
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.statistics.StatisticsClock;
 
 public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
@@ -128,4 +133,76 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
     }
   }
 
+  @Test
+  public void cmnClearRegionWillDoNothingIfNotPrimary() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    RegionVersionVector rvv = mock(RegionVersionVector.class);
+    doReturn(rvv).when(region).getVersionVector();
+    doReturn(ba).when(region).getBucketAdvisor();
+    when(ba.isPrimary()).thenReturn(false);
+    region.cmnClearRegion(event, true, true);
+    verify(region, never()).clearRegionLocally(eq(event), eq(true), eq(rvv));
+  }
+
+  @Test
+  public void cmnClearRegionCalledOnPrimary() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    RegionVersionVector rvv = mock(RegionVersionVector.class);
+    doReturn(rvv).when(region).getVersionVector();
+    doReturn(true).when(region).getConcurrencyChecksEnabled();
+    doReturn(ba).when(region).getBucketAdvisor();
+    doNothing().when(region).distributeClearOperation(any(), any(), any());
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    doNothing().when(region).clearRegionLocally(event, true, null);
+    when(ba.isPrimary()).thenReturn(true);
+    region.cmnClearRegion(event, true, true);
+    verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null));
+  }
+
+  @Test
+  public void clearWillUseNullAsRVVWhenConcurrencyCheckDisabled() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    doReturn(false).when(region).getConcurrencyChecksEnabled();
+    doReturn(ba).when(region).getBucketAdvisor();
+    doNothing().when(region).distributeClearOperation(any(), any(), any());
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    doNothing().when(region).clearRegionLocally(event, true, null);
+    when(ba.isPrimary()).thenReturn(true);
+    region.cmnClearRegion(event, true, true);
+    verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null));
+  }
+
+  @Test
+  public void obtainWriteLocksForClearInBRShouldNotDistribute() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    region.obtainWriteLocksForClear(event, null);
+    assertTrue(region.isUsedForPartitionedRegionBucket());
+  }
+
+  @Test
+  public void updateSizeToZeroOnClearBucketRegion() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    PartitionedRegion pr = region.getPartitionedRegion();
+    PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class);
+    PartitionedRegionStats prStats = mock(PartitionedRegionStats.class);
+    when(pr.getPrStats()).thenReturn(prStats);
+    doNothing().when(prStats).incDataStoreEntryCount(anyInt());
+    doNothing().when(prds).updateMemoryStats(anyInt());
+    when(pr.getDataStore()).thenReturn(prds);
+    region.updateSizeOnCreate("key1", 20);
+    long sizeBeforeClear = region.getTotalBytes();
+    assertEquals(20, sizeBeforeClear);
+    region.updateSizeOnClearRegion((int) sizeBeforeClear);
+    long sizeAfterClear = region.getTotalBytes();
+    assertEquals(0, sizeAfterClear);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 9fbd8fc..ca53ced 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.apache.geode.internal.Assert.fail;
 import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertFalse;
@@ -53,6 +54,14 @@ public class DistributedRegionJUnitTest
   @Override
   protected void setInternalRegionArguments(InternalRegionArguments ira) {}
 
+  protected RegionEventImpl createClearRegionEvent() {
+    DistributedRegion region = prepare(true, true);
+    DistributedMember member = mock(DistributedMember.class);
+    RegionEventImpl regionEvent = new RegionEventImpl(region, Operation.REGION_CLEAR, null, false,
+        member, true);
+    return regionEvent;
+  }
+
   @Override
   protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled,
       RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache,
@@ -246,4 +255,13 @@ public class DistributedRegionJUnitTest
     region.basicBridgeReplace("key1", "value1", false, null, client, true, clientEvent);
     assertThat(clientEvent.getVersionTag().equals(tag));
   }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void localClearIsNotSupportedOnReplicatedRegion() {
+    RegionEventImpl event = createClearRegionEvent();
+    DistributedRegion region = (DistributedRegion) event.getRegion();
+    region.basicLocalClear(event);
+    fail("Expect UnsupportedOperationException");
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
new file mode 100644
index 0000000..2cf5231
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplySender;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.PartitionedRegionStats;
+
+public class ClearPRMessageTest {
+
+  ClearPRMessage message;
+  PartitionedRegion region;
+  PartitionedRegionDataStore dataStore;
+  BucketRegion bucketRegion;
+
+  @Before
+  public void setup() throws ForceReattemptException {
+    message = spy(new ClearPRMessage());
+    region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+    dataStore = mock(PartitionedRegionDataStore.class);
+    when(region.getDataStore()).thenReturn(dataStore);
+    bucketRegion = mock(BucketRegion.class);
+    when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
+  }
+
+  @Test
+  public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() {
+    when(bucketRegion.isPrimary()).thenReturn(false);
+
+    assertThatThrownBy(() -> message.doLocalClear(region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+  }
+
+  @Test
+  public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
+    DistributedLockService mockLockService = mock(DistributedLockService.class);
+    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+    when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false);
+    when(bucketRegion.isPrimary()).thenReturn(true);
+
+    assertThatThrownBy(() -> message.doLocalClear(region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
+  }
+
+  @Test
+  public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() {
+    DistributedLockService mockLockService = mock(DistributedLockService.class);
+    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+    // Be primary on the first check, then be not primary on the second check
+    when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
+    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+
+    assertThatThrownBy(() -> message.doLocalClear(region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+    // Confirm that we actually obtained and released the lock
+    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
+    verify(mockLockService, times(1)).unlock(any());
+  }
+
+  @Test
+  public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
+    DistributedLockService mockLockService = mock(DistributedLockService.class);
+    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+    NullPointerException exception = new NullPointerException("Error encountered");
+    doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+
+    // Be primary on the first check, then be not primary on the second check
+    when(bucketRegion.isPrimary()).thenReturn(true);
+    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+
+    assertThatThrownBy(() -> message.doLocalClear(region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION);
+
+    // Confirm that cmnClearRegion was called
+    verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+  }
+
+  @Test
+  public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
+      throws ForceReattemptException {
+    DistributedLockService mockLockService = mock(DistributedLockService.class);
+    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+
+    // Be primary on the first check, then be not primary on the second check
+    when(bucketRegion.isPrimary()).thenReturn(true);
+    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    assertThat(message.doLocalClear(region)).isTrue();
+
+    // Confirm that cmnClearRegion was called
+    verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+
+    // Confirm that we actually obtained and released the lock
+    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
+    verify(mockLockService, times(1)).unlock(any());
+  }
+
+  @Test
+  public void initMessageSetsReplyProcessorCorrectlyWithDefinedReplyProcessor() {
+    InternalDistributedMember sender = mock(InternalDistributedMember.class);
+
+    Set<InternalDistributedMember> recipients = new HashSet<>();
+    recipients.add(sender);
+
+    ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
+    int mockProcessorId = 5;
+    when(mockProcessor.getProcessorId()).thenReturn(mockProcessorId);
+
+    message.initMessage(region, recipients, mockProcessor);
+
+    verify(mockProcessor, times(1)).enableSevereAlertProcessing();
+    assertThat(message.getProcessorId()).isEqualTo(mockProcessorId);
+  }
+
+  @Test
+  public void initMessageSetsProcessorIdToZeroWithNullProcessor() {
+    message.initMessage(region, null, null);
+
+    assertThat(message.getProcessorId()).isEqualTo(0);
+  }
+
+  @Test
+  public void sendThrowsExceptionIfPutOutgoingMethodReturnsNonNullSetOfFailures() {
+    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(region.getDistributionManager()).thenReturn(distributionManager);
+
+    doNothing().when(message).initMessage(any(), any(), any());
+    Set<InternalDistributedMember> failures = new HashSet<>();
+    failures.add(recipient);
+
+    when(distributionManager.putOutgoing(message)).thenReturn(failures);
+
+    assertThatThrownBy(() -> message.send(recipient, region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining("Failed sending <" + message + ">");
+  }
+
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  @Test
+  public void operateOnPartitionedRegionCallsSendReplyWithNoExceptionWhenDoLocalClearSucceeds()
+      throws ForceReattemptException {
+    ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
+    InternalDistributedMember sender = mock(InternalDistributedMember.class);
+    int processorId = 1000;
+    int startTime = 0;
+
+    doReturn(true).when(message).doLocalClear(region);
+    doReturn(sender).when(message).getSender();
+    doReturn(processorId).when(message).getProcessorId();
+
+    // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
+    // do nothing and verify later that it was called with proper input
+    doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
+
+    message.operateOnPartitionedRegion(distributionManager, region, startTime);
+
+    verify(message, times(1)).sendReply(sender, processorId, distributionManager, null, region,
+        startTime);
+  }
+
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  @Test
+  public void operateOnPartitionedRegionCallsSendReplyWithExceptionWhenDoLocalClearFailsWithException()
+      throws ForceReattemptException {
+    ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
+    InternalDistributedMember sender = mock(InternalDistributedMember.class);
+    int processorId = 1000;
+    int startTime = 0;
+    ForceReattemptException exception =
+        new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+    doThrow(exception).when(message).doLocalClear(region);
+    doReturn(sender).when(message).getSender();
+    doReturn(processorId).when(message).getProcessorId();
+
+    // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
+    // do nothing and verify later that it was called with proper input
+    doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
+
+    message.operateOnPartitionedRegion(distributionManager, region, startTime);
+
+    verify(message, times(1)).sendReply(any(), anyInt(), any(), notNull(), any(), anyLong());
+  }
+
+  @Test
+  public void sendReplyEndsMessageProcessingIfWeHaveARegionAndHaveStartedProcessing() {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+    PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
+    when(region.getPrStats()).thenReturn(partitionedRegionStats);
+
+    int processorId = 1000;
+    int startTime = 10000;
+    ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+    ReplySender replySender = mock(ReplySender.class);
+    doReturn(replySender).when(message).getReplySender(distributionManager);
+
+    message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
+
+    verify(partitionedRegionStats, times(1)).endPartitionMessagesProcessing(startTime);
+  }
+
+  @Test
+  public void sendReplyDoesNotEndMessageProcessingIfStartTimeIsZero() {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+    PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
+    when(region.getPrStats()).thenReturn(partitionedRegionStats);
+
+    int processorId = 1000;
+    int startTime = 0;
+    ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+    ReplySender replySender = mock(ReplySender.class);
+    doReturn(replySender).when(message).getReplySender(distributionManager);
+
+    message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
+
+    verify(partitionedRegionStats, times(0)).endPartitionMessagesProcessing(startTime);
+  }
+
+  @Test
+  public void clearReplyMessageProcessCallsSetResponseIfReplyProcessorIsInstanceOfClearResponse() {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    DMStats mockStats = mock(DMStats.class);
+    when(distributionManager.getStats()).thenReturn(mockStats);
+    ClearPRMessage.ClearReplyMessage clearReplyMessage = new ClearPRMessage.ClearReplyMessage();
+    ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
+
+    clearReplyMessage.process(distributionManager, mockProcessor);
+
+    verify(mockProcessor, times(1)).setResponse(clearReplyMessage);
+  }
+}
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index ff1571c..3598b5d 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -57,6 +57,9 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
   // NOTE, codes < -65536 will take 4 bytes to serialize
   // NOTE, codes < -128 will take 2 bytes to serialize
 
+  short PR_CLEAR_REPLY_MESSAGE = -164;
+  short PR_CLEAR_MESSAGE = -163;
+
   short DISTRIBUTED_PING_MESSAGE = -162;
 
   short REGION_REDUNDANCY_STATUS = -161;