You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/30 01:16:41 UTC
[geode] 03/17: GEODE-9132: Delete ClearPRMessage
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit a6b7732f9388ac1a83410ae48f3c25fab66d0e63
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 9 15:06:40 2021 -0700
GEODE-9132: Delete ClearPRMessage
---
.../org/apache/geode/internal/DSFIDFactory.java | 3 -
.../geode/internal/cache/PartitionedRegion.java | 10 -
.../internal/cache/partitioned/ClearPRMessage.java | 320 ---------------------
.../internal/cache/PartitionedRegionTest.java | 15 -
.../cache/partitioned/ClearPRMessageTest.java | 260 -----------------
5 files changed, 608 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index f0658a6..5ad6058 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -290,7 +290,6 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage;
import org.apache.geode.internal.cache.partitioned.BucketSizeMessage;
import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
-import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage;
import org.apache.geode.internal.cache.partitioned.CreateBucketMessage;
@@ -991,8 +990,6 @@ public class DSFIDFactory implements DataSerializableFixedID {
serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
- serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class);
- serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class);
serializer.registerDSFID(HOST_AND_PORT, HostAndPort.class);
serializer.registerDSFID(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index a62b2b5..37b5383 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -180,7 +180,6 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa
import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
import org.apache.geode.internal.cache.partitioned.DestroyMessage;
@@ -2182,15 +2181,6 @@ public class PartitionedRegion extends LocalRegion
throw new UnsupportedOperationException();
}
- List<ClearPRMessage> createClearPRMessages(EventID eventID) {
- ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
- for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) {
- ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId, eventID);
- clearMsgList.add(clearPRMessage);
- }
- return clearMsgList;
- }
-
@Override
void basicLocalClear(RegionEventImpl event) {
throw new UnsupportedOperationException();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
deleted file mode 100644
index 2603b78..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.cache.partitioned;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.DataSerializer;
-import org.apache.geode.annotations.VisibleForTesting;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.persistence.PartitionOfflineException;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DirectReplyProcessor;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.ReplyException;
-import org.apache.geode.distributed.internal.ReplyMessage;
-import org.apache.geode.distributed.internal.ReplyProcessor21;
-import org.apache.geode.distributed.internal.ReplySender;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.NanoTimer;
-import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.ForceReattemptException;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.RegionEventImpl;
-import org.apache.geode.internal.logging.log4j.LogMarker;
-import org.apache.geode.internal.serialization.DeserializationContext;
-import org.apache.geode.internal.serialization.SerializationContext;
-import org.apache.geode.logging.internal.log4j.api.LogService;
-
-public class ClearPRMessage extends PartitionMessageWithDirectReply {
- private static final Logger logger = LogService.getLogger();
-
- private Integer bucketId;
-
- private EventID eventID;
-
- public static final String BUCKET_NON_PRIMARY_MESSAGE =
- "The bucket region on target member is no longer primary";
- public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
- "An exception was thrown during the local clear operation: ";
-
- /**
- * state from operateOnRegion that must be preserved for transmission from the waiting pool
- */
- transient boolean result = false;
-
- /**
- * Empty constructor to satisfy {@link DataSerializer}requirements
- */
- public ClearPRMessage() {}
-
- public ClearPRMessage(int bucketId, EventID eventID) {
- this.bucketId = bucketId;
- this.eventID = eventID;
- }
-
- public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
- DirectReplyProcessor replyProcessor) {
- this.resetRecipients();
- if (recipients != null) {
- setRecipients(recipients);
- }
- this.regionId = region.getPRId();
- this.processor = replyProcessor;
- this.processorId = replyProcessor == null ? 0 : replyProcessor.getProcessorId();
- if (replyProcessor != null) {
- replyProcessor.enableSevereAlertProcessing();
- }
- }
-
- public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
- throws ForceReattemptException {
- Set<InternalDistributedMember> recipients =
- Collections.singleton((InternalDistributedMember) recipient);
- ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients);
- initMessage(region, recipients, clearResponse);
- if (logger.isDebugEnabled()) {
- logger.debug("ClearPRMessage.send: recipient is {}, msg is {}", recipient, this);
- }
-
- Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this);
- if (failures != null && failures.size() > 0) {
- throw new ForceReattemptException("Failed sending <" + this + "> due to " + failures);
- }
- return clearResponse;
- }
-
- @Override
- public int getDSFID() {
- return PR_CLEAR_MESSAGE;
- }
-
- @Override
- public void toData(DataOutput out, SerializationContext context) throws IOException {
- super.toData(out, context);
- if (bucketId == null) {
- InternalDataSerializer.writeSignedVL(-1, out);
- } else {
- InternalDataSerializer.writeSignedVL(bucketId, out);
- }
- DataSerializer.writeObject(this.eventID, out);
- }
-
- @Override
- public void fromData(DataInput in, DeserializationContext context)
- throws IOException, ClassNotFoundException {
- super.fromData(in, context);
- this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
- this.eventID = (EventID) DataSerializer.readObject(in);
- }
-
- @Override
- public EventID getEventID() {
- return null;
- }
-
- /**
- * This method is called upon receipt and make the desired changes to the PartitionedRegion Note:
- * It is very important that this message does NOT cause any deadlocks as the sender will wait
- * indefinitely for the acknowledgement
- */
- @Override
- @VisibleForTesting
- protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
- PartitionedRegion region, long startTime) {
- try {
- this.result = doLocalClear(region);
- } catch (ForceReattemptException ex) {
- sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
- startTime);
- return false;
- }
- return this.result;
- }
-
- public Integer getBucketId() {
- return this.bucketId;
- }
-
- public boolean doLocalClear(PartitionedRegion region)
- throws ForceReattemptException {
- // Retrieve local bucket region which matches target bucketId
- BucketRegion bucketRegion =
- region.getDataStore().getInitializedBucketForId(null, this.bucketId);
-
- boolean lockedForPrimary = bucketRegion.doLockForPrimary(false);
- // Check if we obtained primary lock, throw exception if not
- if (!lockedForPrimary) {
- throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
- }
- try {
- RegionEventImpl regionEvent = new RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null,
- false, region.getMyId(), eventID);
- bucketRegion.cmnClearRegion(regionEvent, false, true);
- } catch (PartitionOfflineException poe) {
- logger.info(
- "All members holding data for bucket {} are offline, no more retries will be attempted",
- this.bucketId,
- poe);
- throw poe;
- } catch (Exception ex) {
- throw new ForceReattemptException(
- EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
- } finally {
- bucketRegion.doUnlockForPrimary();
- }
-
- return true;
- }
-
- @Override
- public boolean canStartRemoteTransaction() {
- return false;
- }
-
- @Override
- protected void sendReply(InternalDistributedMember member, int processorId,
- DistributionManager distributionManager, ReplyException ex,
- PartitionedRegion partitionedRegion, long startTime) {
- if (partitionedRegion != null) {
- if (startTime > 0) {
- partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
- }
- }
- ClearReplyMessage.send(member, processorId, getReplySender(distributionManager), this.result,
- ex);
- }
-
- @Override
- protected void appendFields(StringBuilder buff) {
- super.appendFields(buff);
- buff.append("; bucketId=").append(this.bucketId);
- }
-
- public static class ClearReplyMessage extends ReplyMessage {
- @Override
- public boolean getInlineProcess() {
- return true;
- }
-
- /**
- * Empty constructor to conform to DataSerializable interface
- */
- @SuppressWarnings("unused")
- public ClearReplyMessage() {}
-
- private ClearReplyMessage(int processorId, boolean result, ReplyException ex) {
- super();
- setProcessorId(processorId);
- if (ex != null) {
- setException(ex);
- } else {
- setReturnValue(result);
- }
- }
-
- /**
- * Send an ack
- */
- public static void send(InternalDistributedMember recipient, int processorId,
- ReplySender replySender,
- boolean result, ReplyException ex) {
- Assert.assertNotNull(recipient, "ClearReplyMessage recipient was NULL.");
- ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex);
- message.setRecipient(recipient);
- replySender.putOutgoing(message);
- }
-
- /**
- * Processes this message. This method is invoked by the receiver of the message.
- *
- * @param distributionManager the distribution manager that is processing the message.
- */
- @Override
- public void process(final DistributionManager distributionManager,
- final ReplyProcessor21 replyProcessor) {
- final long startTime = getTimestamp();
- if (replyProcessor == null) {
- if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
- logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
- }
- return;
- }
- if (replyProcessor instanceof ClearResponse) {
- ((ClearResponse) replyProcessor).setResponse(this);
- }
- replyProcessor.process(this);
-
- if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
- logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", replyProcessor, this);
- }
- distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
- }
-
- @Override
- public int getDSFID() {
- return PR_CLEAR_REPLY_MESSAGE;
- }
-
- @Override
- public String toString() {
- StringBuilder stringBuilder = new StringBuilder(super.toString());
- stringBuilder.append(" returnValue=");
- stringBuilder.append(getReturnValue());
- return stringBuilder.toString();
- }
- }
-
- /**
- * A processor to capture the value returned by {@link ClearPRMessage}
- */
- public static class ClearResponse extends PartitionResponse {
- private volatile boolean returnValue;
-
- public ClearResponse(InternalDistributedSystem distributedSystem,
- Set<InternalDistributedMember> recipients) {
- super(distributedSystem, recipients, false);
- }
-
- public void setResponse(ClearReplyMessage response) {
- if (response.getException() == null) {
- this.returnValue = (boolean) response.getReturnValue();
- }
- }
-
- /**
- * @return the result of the remote clear operation
- * @throws ForceReattemptException if the peer is no longer available
- * @throws CacheException if the peer generates an error
- */
- public boolean waitForResult() throws CacheException, ForceReattemptException {
- waitForCacheException();
- return this.returnValue;
- }
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index f99b74b..2a2897d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -40,7 +40,6 @@ import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -73,7 +72,6 @@ import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
import org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory;
@RunWith(JUnitParamsRunner.class)
@@ -217,19 +215,6 @@ public class PartitionedRegionTest {
}
@Test
- public void createClearPRMessagesShouldCreateMessagePerBucket() {
- PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
- RegionEventImpl regionEvent =
- new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
- spyPartitionedRegion.getMyId(), true);
- when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3);
- EventID eventID = new EventID(spyPartitionedRegion.getCache().getDistributedSystem());
- List<ClearPRMessage> msgs = spyPartitionedRegion.createClearPRMessages(eventID);
- assertThat(msgs.size()).isEqualTo(3);
- }
-
-
- @Test
public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
// ARRANGE
EntryEventImpl clientEvent = mock(EntryEventImpl.class);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
deleted file mode 100644
index acdd4fc..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.partitioned;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.notNull;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DMStats;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.ReplyException;
-import org.apache.geode.distributed.internal.ReplySender;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.ForceReattemptException;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionDataStore;
-import org.apache.geode.internal.cache.PartitionedRegionStats;
-import org.apache.geode.internal.cache.RegionEventImpl;
-
-public class ClearPRMessageTest {
-
- ClearPRMessage message;
- PartitionedRegion region;
- PartitionedRegionDataStore dataStore;
- BucketRegion bucketRegion;
-
- @Before
- public void setup() throws ForceReattemptException {
- message = spy(new ClearPRMessage());
- InternalDistributedMember member = mock(InternalDistributedMember.class);
- region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
- dataStore = mock(PartitionedRegionDataStore.class);
- when(region.getDataStore()).thenReturn(dataStore);
- when(region.getFullPath()).thenReturn("/test");
- bucketRegion = mock(BucketRegion.class);
- when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
- RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class);
- }
-
- @Test
- public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() {
- when(bucketRegion.isPrimary()).thenReturn(false);
-
- assertThatThrownBy(() -> message.doLocalClear(region))
- .isInstanceOf(ForceReattemptException.class)
- .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
- }
-
- @Test
- public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
- when(bucketRegion.doLockForPrimary(false)).thenReturn(false);
-
- assertThatThrownBy(() -> message.doLocalClear(region))
- .isInstanceOf(ForceReattemptException.class)
- .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
- }
-
- @Test
- public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
- NullPointerException exception = new NullPointerException("Error encountered");
- doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
-
- when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
-
- assertThatThrownBy(() -> message.doLocalClear(region))
- .isInstanceOf(ForceReattemptException.class)
- .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION);
-
- // Confirm that cmnClearRegion was called
- verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
- }
-
- @Test
- public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
- throws ForceReattemptException {
-
- // Be primary on the first check, then be not primary on the second check
- when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
- assertThat(message.doLocalClear(region)).isTrue();
-
- // Confirm that cmnClearRegion was called
- verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
- }
-
- @Test
- public void initMessageSetsReplyProcessorCorrectlyWithDefinedReplyProcessor() {
- InternalDistributedMember sender = mock(InternalDistributedMember.class);
-
- Set<InternalDistributedMember> recipients = new HashSet<>();
- recipients.add(sender);
-
- ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
- int mockProcessorId = 5;
- when(mockProcessor.getProcessorId()).thenReturn(mockProcessorId);
-
- message.initMessage(region, recipients, mockProcessor);
-
- verify(mockProcessor, times(1)).enableSevereAlertProcessing();
- assertThat(message.getProcessorId()).isEqualTo(mockProcessorId);
- }
-
- @Test
- public void initMessageSetsProcessorIdToZeroWithNullProcessor() {
- message.initMessage(region, null, null);
-
- assertThat(message.getProcessorId()).isEqualTo(0);
- }
-
- @Test
- public void sendThrowsExceptionIfPutOutgoingMethodReturnsNonNullSetOfFailures() {
- InternalDistributedMember recipient = mock(InternalDistributedMember.class);
-
- DistributionManager distributionManager = mock(DistributionManager.class);
- when(region.getDistributionManager()).thenReturn(distributionManager);
-
- doNothing().when(message).initMessage(any(), any(), any());
- Set<InternalDistributedMember> failures = new HashSet<>();
- failures.add(recipient);
-
- when(distributionManager.putOutgoing(message)).thenReturn(failures);
-
- assertThatThrownBy(() -> message.send(recipient, region))
- .isInstanceOf(ForceReattemptException.class)
- .hasMessageContaining("Failed sending <" + message + ">");
- }
-
- @SuppressWarnings("ResultOfMethodCallIgnored")
- @Test
- public void operateOnPartitionedRegionCallsSendReplyWithNoExceptionWhenDoLocalClearSucceeds()
- throws ForceReattemptException {
- ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
- InternalDistributedMember sender = mock(InternalDistributedMember.class);
- int processorId = 1000;
- int startTime = 0;
-
- doReturn(0).when(message).getBucketId();
- doReturn(true).when(message).doLocalClear(region);
- doReturn(sender).when(message).getSender();
- doReturn(processorId).when(message).getProcessorId();
-
- // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
- // do nothing and verify later that it was called with proper input
- doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
-
- message.operateOnPartitionedRegion(distributionManager, region, startTime);
- assertThat(message.result).isTrue();
-
- verify(message, times(0)).sendReply(sender, processorId, distributionManager, null, region,
- startTime);
- }
-
- @SuppressWarnings("ResultOfMethodCallIgnored")
- @Test
- public void operateOnPartitionedRegionCallsSendReplyWithExceptionWhenDoLocalClearFailsWithException()
- throws ForceReattemptException {
- ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
- InternalDistributedMember sender = mock(InternalDistributedMember.class);
- int processorId = 1000;
- int startTime = 0;
- ForceReattemptException exception =
- new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-
- doReturn(0).when(message).getBucketId();
- doThrow(exception).when(message).doLocalClear(region);
- doReturn(sender).when(message).getSender();
- doReturn(processorId).when(message).getProcessorId();
-
- // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
- // do nothing and verify later that it was called with proper input
- doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
-
- message.operateOnPartitionedRegion(distributionManager, region, startTime);
-
- verify(message, times(1)).sendReply(any(), anyInt(), any(), notNull(), any(), anyLong());
- }
-
- @Test
- public void sendReplyEndsMessageProcessingIfWeHaveARegionAndHaveStartedProcessing() {
- DistributionManager distributionManager = mock(DistributionManager.class);
- InternalDistributedMember recipient = mock(InternalDistributedMember.class);
- PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
- when(region.getPrStats()).thenReturn(partitionedRegionStats);
-
- int processorId = 1000;
- int startTime = 10000;
- ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-
- ReplySender replySender = mock(ReplySender.class);
- doReturn(replySender).when(message).getReplySender(distributionManager);
-
- message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
-
- verify(partitionedRegionStats, times(1)).endPartitionMessagesProcessing(startTime);
- }
-
- @Test
- public void sendReplyDoesNotEndMessageProcessingIfStartTimeIsZero() {
- DistributionManager distributionManager = mock(DistributionManager.class);
- InternalDistributedMember recipient = mock(InternalDistributedMember.class);
- PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
- when(region.getPrStats()).thenReturn(partitionedRegionStats);
-
- int processorId = 1000;
- int startTime = 0;
- ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-
- ReplySender replySender = mock(ReplySender.class);
- doReturn(replySender).when(message).getReplySender(distributionManager);
-
- message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
-
- verify(partitionedRegionStats, times(0)).endPartitionMessagesProcessing(startTime);
- }
-
- @Test
- public void clearReplyMessageProcessCallsSetResponseIfReplyProcessorIsInstanceOfClearResponse() {
- DistributionManager distributionManager = mock(DistributionManager.class);
- DMStats mockStats = mock(DMStats.class);
- when(distributionManager.getStats()).thenReturn(mockStats);
- ClearPRMessage.ClearReplyMessage clearReplyMessage = new ClearPRMessage.ClearReplyMessage();
- ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
-
- clearReplyMessage.process(distributionManager, mockProcessor);
-
- verify(mockProcessor, times(1)).setResponse(clearReplyMessage);
- }
-}