You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:41 UTC

[geode] 03/17: GEODE-9132: Delete ClearPRMessage

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a6b7732f9388ac1a83410ae48f3c25fab66d0e63
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 9 15:06:40 2021 -0700

    GEODE-9132: Delete ClearPRMessage
---
 .../org/apache/geode/internal/DSFIDFactory.java    |   3 -
 .../geode/internal/cache/PartitionedRegion.java    |  10 -
 .../internal/cache/partitioned/ClearPRMessage.java | 320 ---------------------
 .../internal/cache/PartitionedRegionTest.java      |  15 -
 .../cache/partitioned/ClearPRMessageTest.java      | 260 -----------------
 5 files changed, 608 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index f0658a6..5ad6058 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -290,7 +290,6 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
 import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
-import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage;
 import org.apache.geode.internal.cache.partitioned.CreateBucketMessage;
@@ -991,8 +990,6 @@ public class DSFIDFactory implements DataSerializableFixedID {
     serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
         GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
     serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
-    serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class);
-    serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class);
     serializer.registerDSFID(HOST_AND_PORT, HostAndPort.class);
     serializer.registerDSFID(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index a62b2b5..37b5383 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -180,7 +180,6 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
 import org.apache.geode.internal.cache.partitioned.DestroyMessage;
@@ -2182,15 +2181,6 @@ public class PartitionedRegion extends LocalRegion
     throw new UnsupportedOperationException();
   }
 
-  List<ClearPRMessage> createClearPRMessages(EventID eventID) {
-    ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
-    for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) {
-      ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId, eventID);
-      clearMsgList.add(clearPRMessage);
-    }
-    return clearMsgList;
-  }
-
   @Override
   void basicLocalClear(RegionEventImpl event) {
     throw new UnsupportedOperationException();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
deleted file mode 100644
index 2603b78..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.cache.partitioned;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.DataSerializer;
-import org.apache.geode.annotations.VisibleForTesting;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.persistence.PartitionOfflineException;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DirectReplyProcessor;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.ReplyException;
-import org.apache.geode.distributed.internal.ReplyMessage;
-import org.apache.geode.distributed.internal.ReplyProcessor21;
-import org.apache.geode.distributed.internal.ReplySender;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.NanoTimer;
-import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.ForceReattemptException;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.RegionEventImpl;
-import org.apache.geode.internal.logging.log4j.LogMarker;
-import org.apache.geode.internal.serialization.DeserializationContext;
-import org.apache.geode.internal.serialization.SerializationContext;
-import org.apache.geode.logging.internal.log4j.api.LogService;
-
-public class ClearPRMessage extends PartitionMessageWithDirectReply {
-  private static final Logger logger = LogService.getLogger();
-
-  private Integer bucketId;
-
-  private EventID eventID;
-
-  public static final String BUCKET_NON_PRIMARY_MESSAGE =
-      "The bucket region on target member is no longer primary";
-  public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
-      "An exception was thrown during the local clear operation: ";
-
-  /**
-   * state from operateOnRegion that must be preserved for transmission from the waiting pool
-   */
-  transient boolean result = false;
-
-  /**
-   * Empty constructor to satisfy {@link DataSerializer}requirements
-   */
-  public ClearPRMessage() {}
-
-  public ClearPRMessage(int bucketId, EventID eventID) {
-    this.bucketId = bucketId;
-    this.eventID = eventID;
-  }
-
-  public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
-      DirectReplyProcessor replyProcessor) {
-    this.resetRecipients();
-    if (recipients != null) {
-      setRecipients(recipients);
-    }
-    this.regionId = region.getPRId();
-    this.processor = replyProcessor;
-    this.processorId = replyProcessor == null ? 0 : replyProcessor.getProcessorId();
-    if (replyProcessor != null) {
-      replyProcessor.enableSevereAlertProcessing();
-    }
-  }
-
-  public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
-      throws ForceReattemptException {
-    Set<InternalDistributedMember> recipients =
-        Collections.singleton((InternalDistributedMember) recipient);
-    ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients);
-    initMessage(region, recipients, clearResponse);
-    if (logger.isDebugEnabled()) {
-      logger.debug("ClearPRMessage.send: recipient is {}, msg is {}", recipient, this);
-    }
-
-    Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this);
-    if (failures != null && failures.size() > 0) {
-      throw new ForceReattemptException("Failed sending <" + this + "> due to " + failures);
-    }
-    return clearResponse;
-  }
-
-  @Override
-  public int getDSFID() {
-    return PR_CLEAR_MESSAGE;
-  }
-
-  @Override
-  public void toData(DataOutput out, SerializationContext context) throws IOException {
-    super.toData(out, context);
-    if (bucketId == null) {
-      InternalDataSerializer.writeSignedVL(-1, out);
-    } else {
-      InternalDataSerializer.writeSignedVL(bucketId, out);
-    }
-    DataSerializer.writeObject(this.eventID, out);
-  }
-
-  @Override
-  public void fromData(DataInput in, DeserializationContext context)
-      throws IOException, ClassNotFoundException {
-    super.fromData(in, context);
-    this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
-    this.eventID = (EventID) DataSerializer.readObject(in);
-  }
-
-  @Override
-  public EventID getEventID() {
-    return null;
-  }
-
-  /**
-   * This method is called upon receipt and make the desired changes to the PartitionedRegion Note:
-   * It is very important that this message does NOT cause any deadlocks as the sender will wait
-   * indefinitely for the acknowledgement
-   */
-  @Override
-  @VisibleForTesting
-  protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
-      PartitionedRegion region, long startTime) {
-    try {
-      this.result = doLocalClear(region);
-    } catch (ForceReattemptException ex) {
-      sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
-          startTime);
-      return false;
-    }
-    return this.result;
-  }
-
-  public Integer getBucketId() {
-    return this.bucketId;
-  }
-
-  public boolean doLocalClear(PartitionedRegion region)
-      throws ForceReattemptException {
-    // Retrieve local bucket region which matches target bucketId
-    BucketRegion bucketRegion =
-        region.getDataStore().getInitializedBucketForId(null, this.bucketId);
-
-    boolean lockedForPrimary = bucketRegion.doLockForPrimary(false);
-    // Check if we obtained primary lock, throw exception if not
-    if (!lockedForPrimary) {
-      throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
-    }
-    try {
-      RegionEventImpl regionEvent = new RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null,
-          false, region.getMyId(), eventID);
-      bucketRegion.cmnClearRegion(regionEvent, false, true);
-    } catch (PartitionOfflineException poe) {
-      logger.info(
-          "All members holding data for bucket {} are offline, no more retries will be attempted",
-          this.bucketId,
-          poe);
-      throw poe;
-    } catch (Exception ex) {
-      throw new ForceReattemptException(
-          EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
-    } finally {
-      bucketRegion.doUnlockForPrimary();
-    }
-
-    return true;
-  }
-
-  @Override
-  public boolean canStartRemoteTransaction() {
-    return false;
-  }
-
-  @Override
-  protected void sendReply(InternalDistributedMember member, int processorId,
-      DistributionManager distributionManager, ReplyException ex,
-      PartitionedRegion partitionedRegion, long startTime) {
-    if (partitionedRegion != null) {
-      if (startTime > 0) {
-        partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
-      }
-    }
-    ClearReplyMessage.send(member, processorId, getReplySender(distributionManager), this.result,
-        ex);
-  }
-
-  @Override
-  protected void appendFields(StringBuilder buff) {
-    super.appendFields(buff);
-    buff.append("; bucketId=").append(this.bucketId);
-  }
-
-  public static class ClearReplyMessage extends ReplyMessage {
-    @Override
-    public boolean getInlineProcess() {
-      return true;
-    }
-
-    /**
-     * Empty constructor to conform to DataSerializable interface
-     */
-    @SuppressWarnings("unused")
-    public ClearReplyMessage() {}
-
-    private ClearReplyMessage(int processorId, boolean result, ReplyException ex) {
-      super();
-      setProcessorId(processorId);
-      if (ex != null) {
-        setException(ex);
-      } else {
-        setReturnValue(result);
-      }
-    }
-
-    /**
-     * Send an ack
-     */
-    public static void send(InternalDistributedMember recipient, int processorId,
-        ReplySender replySender,
-        boolean result, ReplyException ex) {
-      Assert.assertNotNull(recipient, "ClearReplyMessage recipient was NULL.");
-      ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex);
-      message.setRecipient(recipient);
-      replySender.putOutgoing(message);
-    }
-
-    /**
-     * Processes this message. This method is invoked by the receiver of the message.
-     *
-     * @param distributionManager the distribution manager that is processing the message.
-     */
-    @Override
-    public void process(final DistributionManager distributionManager,
-        final ReplyProcessor21 replyProcessor) {
-      final long startTime = getTimestamp();
-      if (replyProcessor == null) {
-        if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
-          logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
-        }
-        return;
-      }
-      if (replyProcessor instanceof ClearResponse) {
-        ((ClearResponse) replyProcessor).setResponse(this);
-      }
-      replyProcessor.process(this);
-
-      if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
-        logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", replyProcessor, this);
-      }
-      distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
-    }
-
-    @Override
-    public int getDSFID() {
-      return PR_CLEAR_REPLY_MESSAGE;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder stringBuilder = new StringBuilder(super.toString());
-      stringBuilder.append(" returnValue=");
-      stringBuilder.append(getReturnValue());
-      return stringBuilder.toString();
-    }
-  }
-
-  /**
-   * A processor to capture the value returned by {@link ClearPRMessage}
-   */
-  public static class ClearResponse extends PartitionResponse {
-    private volatile boolean returnValue;
-
-    public ClearResponse(InternalDistributedSystem distributedSystem,
-        Set<InternalDistributedMember> recipients) {
-      super(distributedSystem, recipients, false);
-    }
-
-    public void setResponse(ClearReplyMessage response) {
-      if (response.getException() == null) {
-        this.returnValue = (boolean) response.getReturnValue();
-      }
-    }
-
-    /**
-     * @return the result of the remote clear operation
-     * @throws ForceReattemptException if the peer is no longer available
-     * @throws CacheException if the peer generates an error
-     */
-    public boolean waitForResult() throws CacheException, ForceReattemptException {
-      waitForCacheException();
-      return this.returnValue;
-    }
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index f99b74b..2a2897d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -40,7 +40,6 @@ import static org.mockito.quality.Strictness.STRICT_STUBS;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -73,7 +72,6 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory;
 
 @RunWith(JUnitParamsRunner.class)
@@ -217,19 +215,6 @@ public class PartitionedRegionTest {
   }
 
   @Test
-  public void createClearPRMessagesShouldCreateMessagePerBucket() {
-    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
-    RegionEventImpl regionEvent =
-        new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
-            spyPartitionedRegion.getMyId(), true);
-    when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3);
-    EventID eventID = new EventID(spyPartitionedRegion.getCache().getDistributedSystem());
-    List<ClearPRMessage> msgs = spyPartitionedRegion.createClearPRMessages(eventID);
-    assertThat(msgs.size()).isEqualTo(3);
-  }
-
-
-  @Test
   public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
     // ARRANGE
     EntryEventImpl clientEvent = mock(EntryEventImpl.class);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
deleted file mode 100644
index acdd4fc..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.partitioned;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.notNull;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DMStats;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.ReplyException;
-import org.apache.geode.distributed.internal.ReplySender;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.ForceReattemptException;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionDataStore;
-import org.apache.geode.internal.cache.PartitionedRegionStats;
-import org.apache.geode.internal.cache.RegionEventImpl;
-
-public class ClearPRMessageTest {
-
-  ClearPRMessage message;
-  PartitionedRegion region;
-  PartitionedRegionDataStore dataStore;
-  BucketRegion bucketRegion;
-
-  @Before
-  public void setup() throws ForceReattemptException {
-    message = spy(new ClearPRMessage());
-    InternalDistributedMember member = mock(InternalDistributedMember.class);
-    region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
-    dataStore = mock(PartitionedRegionDataStore.class);
-    when(region.getDataStore()).thenReturn(dataStore);
-    when(region.getFullPath()).thenReturn("/test");
-    bucketRegion = mock(BucketRegion.class);
-    when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
-    RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class);
-  }
-
-  @Test
-  public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() {
-    when(bucketRegion.isPrimary()).thenReturn(false);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
-        .isInstanceOf(ForceReattemptException.class)
-        .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-  }
-
-  @Test
-  public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
-    when(bucketRegion.doLockForPrimary(false)).thenReturn(false);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
-        .isInstanceOf(ForceReattemptException.class)
-        .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-  }
-
-  @Test
-  public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
-    NullPointerException exception = new NullPointerException("Error encountered");
-    doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
-
-    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
-        .isInstanceOf(ForceReattemptException.class)
-        .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION);
-
-    // Confirm that cmnClearRegion was called
-    verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
-  }
-
-  @Test
-  public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
-      throws ForceReattemptException {
-
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
-    assertThat(message.doLocalClear(region)).isTrue();
-
-    // Confirm that cmnClearRegion was called
-    verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
-  }
-
-  @Test
-  public void initMessageSetsReplyProcessorCorrectlyWithDefinedReplyProcessor() {
-    InternalDistributedMember sender = mock(InternalDistributedMember.class);
-
-    Set<InternalDistributedMember> recipients = new HashSet<>();
-    recipients.add(sender);
-
-    ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
-    int mockProcessorId = 5;
-    when(mockProcessor.getProcessorId()).thenReturn(mockProcessorId);
-
-    message.initMessage(region, recipients, mockProcessor);
-
-    verify(mockProcessor, times(1)).enableSevereAlertProcessing();
-    assertThat(message.getProcessorId()).isEqualTo(mockProcessorId);
-  }
-
-  @Test
-  public void initMessageSetsProcessorIdToZeroWithNullProcessor() {
-    message.initMessage(region, null, null);
-
-    assertThat(message.getProcessorId()).isEqualTo(0);
-  }
-
-  @Test
-  public void sendThrowsExceptionIfPutOutgoingMethodReturnsNonNullSetOfFailures() {
-    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
-
-    DistributionManager distributionManager = mock(DistributionManager.class);
-    when(region.getDistributionManager()).thenReturn(distributionManager);
-
-    doNothing().when(message).initMessage(any(), any(), any());
-    Set<InternalDistributedMember> failures = new HashSet<>();
-    failures.add(recipient);
-
-    when(distributionManager.putOutgoing(message)).thenReturn(failures);
-
-    assertThatThrownBy(() -> message.send(recipient, region))
-        .isInstanceOf(ForceReattemptException.class)
-        .hasMessageContaining("Failed sending <" + message + ">");
-  }
-
-  @SuppressWarnings("ResultOfMethodCallIgnored")
-  @Test
-  public void operateOnPartitionedRegionCallsSendReplyWithNoExceptionWhenDoLocalClearSucceeds()
-      throws ForceReattemptException {
-    ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
-    InternalDistributedMember sender = mock(InternalDistributedMember.class);
-    int processorId = 1000;
-    int startTime = 0;
-
-    doReturn(0).when(message).getBucketId();
-    doReturn(true).when(message).doLocalClear(region);
-    doReturn(sender).when(message).getSender();
-    doReturn(processorId).when(message).getProcessorId();
-
-    // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
-    // do nothing and verify later that it was called with proper input
-    doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
-
-    message.operateOnPartitionedRegion(distributionManager, region, startTime);
-    assertThat(message.result).isTrue();
-
-    verify(message, times(0)).sendReply(sender, processorId, distributionManager, null, region,
-        startTime);
-  }
-
-  @SuppressWarnings("ResultOfMethodCallIgnored")
-  @Test
-  public void operateOnPartitionedRegionCallsSendReplyWithExceptionWhenDoLocalClearFailsWithException()
-      throws ForceReattemptException {
-    ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
-    InternalDistributedMember sender = mock(InternalDistributedMember.class);
-    int processorId = 1000;
-    int startTime = 0;
-    ForceReattemptException exception =
-        new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-
-    doReturn(0).when(message).getBucketId();
-    doThrow(exception).when(message).doLocalClear(region);
-    doReturn(sender).when(message).getSender();
-    doReturn(processorId).when(message).getProcessorId();
-
-    // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
-    // do nothing and verify later that it was called with proper input
-    doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
-
-    message.operateOnPartitionedRegion(distributionManager, region, startTime);
-
-    verify(message, times(1)).sendReply(any(), anyInt(), any(), notNull(), any(), anyLong());
-  }
-
-  @Test
-  public void sendReplyEndsMessageProcessingIfWeHaveARegionAndHaveStartedProcessing() {
-    DistributionManager distributionManager = mock(DistributionManager.class);
-    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
-    PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
-    when(region.getPrStats()).thenReturn(partitionedRegionStats);
-
-    int processorId = 1000;
-    int startTime = 10000;
-    ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-
-    ReplySender replySender = mock(ReplySender.class);
-    doReturn(replySender).when(message).getReplySender(distributionManager);
-
-    message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
-
-    verify(partitionedRegionStats, times(1)).endPartitionMessagesProcessing(startTime);
-  }
-
-  @Test
-  public void sendReplyDoesNotEndMessageProcessingIfStartTimeIsZero() {
-    DistributionManager distributionManager = mock(DistributionManager.class);
-    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
-    PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
-    when(region.getPrStats()).thenReturn(partitionedRegionStats);
-
-    int processorId = 1000;
-    int startTime = 0;
-    ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-
-    ReplySender replySender = mock(ReplySender.class);
-    doReturn(replySender).when(message).getReplySender(distributionManager);
-
-    message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
-
-    verify(partitionedRegionStats, times(0)).endPartitionMessagesProcessing(startTime);
-  }
-
-  @Test
-  public void clearReplyMessageProcessCallsSetResponseIfReplyProcessorIsInstanceOfClearResponse() {
-    DistributionManager distributionManager = mock(DistributionManager.class);
-    DMStats mockStats = mock(DMStats.class);
-    when(distributionManager.getStats()).thenReturn(mockStats);
-    ClearPRMessage.ClearReplyMessage clearReplyMessage = new ClearPRMessage.ClearReplyMessage();
-    ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
-
-    clearReplyMessage.process(distributionManager, mockProcessor);
-
-    verify(mockProcessor, times(1)).setResponse(clearReplyMessage);
-  }
-}