You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2020/07/09 18:59:46 UTC
[geode] 01/13: GEODE-7683: introduce BR.cmnClearRegion
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 3e8304cfa9df6645cb3082931ad4ff1ddd390dc1
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Jan 27 17:02:48 2020 -0800
GEODE-7683: introduce BR.cmnClearRegion
Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
GEODE-7684: Create messaging class for PR Clear (#4689)
* Added new message class and test
Co-authored-by: Benjamin Ross <br...@pivotal.io>
Co-authored-by: Donal Evans <do...@pivotal.io>
---
.../codeAnalysis/sanctionedDataSerializables.txt | 8 +
.../apache/geode/internal/cache/BucketRegion.java | 38 +-
.../geode/internal/cache/DistributedRegion.java | 23 +-
.../internal/cache/partitioned/ClearPRMessage.java | 388 +++++++++++++++++++++
.../internal/cache/BucketRegionJUnitTest.java | 77 ++++
.../internal/cache/DistributedRegionJUnitTest.java | 18 +
.../cache/partitioned/ClearPRMessageTest.java | 288 +++++++++++++++
.../serialization/DataSerializableFixedID.java | 3 +
8 files changed, 832 insertions(+), 11 deletions(-)
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 733737e..6dd3a34 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1368,6 +1368,14 @@ org/apache/geode/internal/cache/partitioned/BucketSizeMessage$BucketSizeReplyMes
fromData,27
toData,27
+org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
+fromData,30
+toData,44
+
+org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
+fromData,17
+toData,17
+
org/apache/geode/internal/cache/partitioned/ColocatedRegionDetails,2
fromData,81
toData,133
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index d91786f..e4fa7ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -557,6 +557,36 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
}
+ @Override
+ public void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
+ if (!getBucketAdvisor().isPrimary()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Not primary bucket when doing clear, do nothing");
+ }
+ return;
+ }
+
+ boolean enableRVV = useRVV && getConcurrencyChecksEnabled();
+ RegionVersionVector rvv = null;
+ if (enableRVV) {
+ rvv = getVersionVector().getCloneForTransmission();
+ }
+
+ // get rvvLock
+ Set<InternalDistributedMember> participants =
+ getCacheDistributionAdvisor().adviseInvalidateRegion();
+ try {
+ obtainWriteLocksForClear(regionEvent, participants);
+ // no need to dominate my own rvv.
+ // Clear is on going here, there won't be GII for this member
+ clearRegionLocally(regionEvent, cacheWrite, null);
+ distributeClearOperation(regionEvent, rvv, participants);
+
+ // TODO: call reindexUserDataRegion if there're lucene indexes
+ } finally {
+ releaseWriteLocksForClear(regionEvent, participants);
+ }
+ }
long generateTailKey() {
long key = eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets());
@@ -2093,11 +2123,9 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// if GII has failed, because there is not primary. So it's safe to set these
// counters to 0.
oldMemValue = bytesInMemory.getAndSet(0);
- }
-
- else {
- throw new InternalGemFireError(
- "Trying to clear a bucket region that was not destroyed or in initialization.");
+ } else {
+ // BucketRegion's clear is supported now
+ oldMemValue = bytesInMemory.getAndSet(0);
}
if (oldMemValue != BUCKET_DESTROYED) {
partitionedRegion.getPrStats().incDataStoreEntryCount(-sizeBeforeClear);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index b822dde..489d85a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2013,6 +2013,10 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
super.basicClear(regionEvent, cacheWrite);
}
+ void distributeClearOperation(RegionEventImpl regionEvent, RegionVersionVector rvv,
+ Set<InternalDistributedMember> participants) {
+ DistributedClearOperation.clear(regionEvent, rvv, participants);
+ }
@Override
void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
@@ -2035,7 +2039,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
obtainWriteLocksForClear(regionEvent, participants);
clearRegionLocally(regionEvent, cacheWrite, null);
if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
- DistributedClearOperation.clear(regionEvent, null, participants);
+ distributeClearOperation(regionEvent, null, participants);
}
} finally {
releaseWriteLocksForClear(regionEvent, participants);
@@ -2091,10 +2095,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
/**
* obtain locks preventing generation of new versions in other members
*/
- private void obtainWriteLocksForClear(RegionEventImpl regionEvent,
+ protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
Set<InternalDistributedMember> participants) {
lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
- DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+ if (!isUsedForPartitionedRegionBucket()) {
+ DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+ }
}
/**
@@ -2131,7 +2137,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
/**
* releases the locks obtained in obtainWriteLocksForClear
*/
- private void releaseWriteLocksForClear(RegionEventImpl regionEvent,
+ protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
Set<InternalDistributedMember> participants) {
ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
@@ -2139,8 +2145,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
armLockTestHook.beforeRelease(this, regionEvent);
}
- getVersionVector().unlockForClear(getMyId());
- DistributedClearOperation.releaseLocks(regionEvent, participants);
+ RegionVersionVector rvv = getVersionVector();
+ if (rvv != null) {
+ rvv.unlockForClear(getMyId());
+ }
+ if (!isUsedForPartitionedRegionBucket()) {
+ DistributedClearOperation.releaseLocks(regionEvent, participants);
+ }
if (armLockTestHook != null) {
armLockTestHook.afterRelease(this, regionEvent);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
new file mode 100644
index 0000000..1a8aba1
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.partitioned;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.ReplySender;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.RegionEventImpl;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPRMessage extends PartitionMessageWithDirectReply {
+ private static final Logger logger = LogService.getLogger();
+
+ private RegionEventImpl regionEvent;
+
+ private Integer bucketId;
+
+ /** The time in ms to wait for a lock to be obtained during doLocalClear() */
+ public static final int LOCK_WAIT_TIMEOUT_MS = 1000;
+ public static final String BUCKET_NON_PRIMARY_MESSAGE =
+ "The bucket region on target member is no longer primary";
+ public static final String BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE =
+ "A lock for the bucket region could not be obtained.";
+ public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
+ "An exception was thrown during the local clear operation: ";
+
+ /**
+ * state from operateOnRegion that must be preserved for transmission from the waiting pool
+ */
+ transient boolean result = false;
+
+ /**
+ * Empty constructor to satisfy {@link DataSerializer}requirements
+ */
+ public ClearPRMessage() {}
+
+ public ClearPRMessage(int bucketId) {
+ this.bucketId = bucketId;
+
+ // These are both used by the parent class, but don't apply to this message type
+ this.notificationOnly = false;
+ this.posDup = false;
+ }
+
+ public void setRegionEvent(RegionEventImpl event) {
+ regionEvent = event;
+ }
+
+ public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
+ DirectReplyProcessor replyProcessor) {
+ this.resetRecipients();
+ if (recipients != null) {
+ setRecipients(recipients);
+ }
+ this.regionId = region.getPRId();
+ this.processor = replyProcessor;
+ this.processorId = replyProcessor == null ? 0 : replyProcessor.getProcessorId();
+ if (replyProcessor != null) {
+ replyProcessor.enableSevereAlertProcessing();
+ }
+ }
+
+ @Override
+ public boolean isSevereAlertCompatible() {
+ // allow forced-disconnect processing for all cache op messages
+ return true;
+ }
+
+ public RegionEventImpl getRegionEvent() {
+ return regionEvent;
+ }
+
+ public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
+ throws ForceReattemptException {
+ Set<InternalDistributedMember> recipients =
+ Collections.singleton((InternalDistributedMember) recipient);
+ ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients);
+ initMessage(region, recipients, clearResponse);
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClearPRMessage.send: recipient is {}, msg is {}", recipient, this);
+ }
+
+ Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this);
+ if (failures != null && failures.size() > 0) {
+ throw new ForceReattemptException("Failed sending <" + this + ">");
+ }
+ return clearResponse;
+ }
+
+ @Override
+ public int getDSFID() {
+ return PR_CLEAR_MESSAGE;
+ }
+
+ @Override
+ public void toData(DataOutput out, SerializationContext context) throws IOException {
+ super.toData(out, context);
+ if (bucketId == null) {
+ InternalDataSerializer.writeSignedVL(-1, out);
+ } else {
+ InternalDataSerializer.writeSignedVL(bucketId, out);
+ }
+ DataSerializer.writeObject(regionEvent, out);
+ }
+
+ @Override
+ public void fromData(DataInput in, DeserializationContext context)
+ throws IOException, ClassNotFoundException {
+ super.fromData(in, context);
+ this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
+ this.regionEvent = DataSerializer.readObject(in);
+ }
+
+ @Override
+ public EventID getEventID() {
+ return regionEvent.getEventId();
+ }
+
+ /**
+ * This method is called upon receipt and make the desired changes to the PartitionedRegion Note:
+ * It is very important that this message does NOT cause any deadlocks as the sender will wait
+ * indefinitely for the acknowledgement
+ */
+ @Override
+ @VisibleForTesting
+ protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
+ PartitionedRegion region, long startTime) {
+ try {
+ result = doLocalClear(region);
+ } catch (ForceReattemptException ex) {
+ sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
+ startTime);
+ return false;
+ }
+ sendReply(getSender(), getProcessorId(), distributionManager, null, region, startTime);
+ return false;
+ }
+
+ public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException {
+ // Retrieve local bucket region which matches target bucketId
+ BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId);
+
+ // Check if we are primary, throw exception if not
+ if (!bucketRegion.isPrimary()) {
+ throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
+ }
+
+ DistributedLockService lockService = getPartitionRegionLockService();
+ String lockName = bucketRegion.getFullPath();
+ try {
+ boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1);
+
+ if (!locked) {
+ throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
+ }
+
+ // Double check if we are still primary, as this could have changed between our first check
+ // and obtaining the lock
+ if (!bucketRegion.isPrimary()) {
+ throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
+ }
+
+ try {
+ bucketRegion.cmnClearRegion(regionEvent, true, true);
+ } catch (Exception ex) {
+ throw new ForceReattemptException(
+ EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
+ }
+
+ } finally {
+ lockService.unlock(lockName);
+ }
+
+ return true;
+ }
+
+ // Extracted for testing
+ protected DistributedLockService getPartitionRegionLockService() {
+ return DistributedLockService
+ .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
+ }
+
+ @Override
+ public boolean canStartRemoteTransaction() {
+ return false;
+ }
+
+ @Override
+ protected void sendReply(InternalDistributedMember member, int processorId,
+ DistributionManager distributionManager, ReplyException ex,
+ PartitionedRegion partitionedRegion, long startTime) {
+ if (partitionedRegion != null) {
+ if (startTime > 0) {
+ partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+ }
+ }
+ ClearReplyMessage.send(member, processorId, getReplySender(distributionManager), this.result,
+ ex);
+ }
+
+ @Override
+ protected void appendFields(StringBuilder buff) {
+ super.appendFields(buff);
+ buff.append("; bucketId=").append(this.bucketId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buff = new StringBuilder();
+ String className = getClass().getName();
+ buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo>
+ buff.append("(prid="); // make sure this is the first one
+ buff.append(this.regionId);
+
+ // Append name, if we have it
+ String name = null;
+ try {
+ PartitionedRegion region = PartitionedRegion.getPRFromId(this.regionId);
+ if (region != null) {
+ name = region.getFullPath();
+ }
+ } catch (Exception ignore) {
+ /* ignored */
+ }
+ if (name != null) {
+ buff.append(" (name = \"").append(name).append("\")");
+ }
+
+ appendFields(buff);
+ buff.append(" ,distTx=");
+ buff.append(this.isTransactionDistributed);
+ buff.append(")");
+ return buff.toString();
+ }
+
+ public static class ClearReplyMessage extends ReplyMessage {
+ /** Result of the Clear operation */
+ boolean result;
+
+ @Override
+ public boolean getInlineProcess() {
+ return true;
+ }
+
+ /**
+ * Empty constructor to conform to DataSerializable interface
+ */
+ @SuppressWarnings("unused")
+ public ClearReplyMessage() {}
+
+ private ClearReplyMessage(int processorId, boolean result, ReplyException ex) {
+ super();
+ this.result = result;
+ setProcessorId(processorId);
+ setException(ex);
+ }
+
+ /** Send an ack */
+ public static void send(InternalDistributedMember recipient, int processorId,
+ ReplySender replySender,
+ boolean result, ReplyException ex) {
+ Assert.assertTrue(recipient != null, "ClearReplyMessage NULL reply message");
+ ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex);
+ message.setRecipient(recipient);
+ replySender.putOutgoing(message);
+ }
+
+ /**
+ * Processes this message. This method is invoked by the receiver of the message.
+ *
+ * @param distributionManager the distribution manager that is processing the message.
+ */
+ @Override
+ public void process(final DistributionManager distributionManager,
+ final ReplyProcessor21 replyProcessor) {
+ final long startTime = getTimestamp();
+ if (replyProcessor == null) {
+ if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+ logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
+ }
+ return;
+ }
+ if (replyProcessor instanceof ClearResponse) {
+ ((ClearResponse) replyProcessor).setResponse(this);
+ }
+ replyProcessor.process(this);
+
+ if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+ logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", replyProcessor, this);
+ }
+ distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
+ }
+
+ @Override
+ public int getDSFID() {
+ return PR_CLEAR_REPLY_MESSAGE;
+ }
+
+ @Override
+ public void fromData(DataInput in,
+ DeserializationContext context) throws IOException, ClassNotFoundException {
+ super.fromData(in, context);
+ this.result = in.readBoolean();
+ }
+
+ @Override
+ public void toData(DataOutput out,
+ SerializationContext context) throws IOException {
+ super.toData(out, context);
+ out.writeBoolean(this.result);
+ }
+
+ @Override
+ public String toString() {
+ return "ClearReplyMessage " + "processorid=" + this.processorId + " returning " + this.result
+ + " exception=" + getException();
+ }
+ }
+
+ /**
+ * A processor to capture the value returned by {@link ClearPRMessage}
+ */
+ public static class ClearResponse extends PartitionResponse {
+ private volatile boolean returnValue;
+
+ public ClearResponse(InternalDistributedSystem distributedSystem,
+ Set<InternalDistributedMember> recipients) {
+ super(distributedSystem, recipients, false);
+ }
+
+ public void setResponse(ClearReplyMessage response) {
+ this.returnValue = response.result;
+ }
+
+ /**
+ * @return the result of the remote clear operation
+ * @throws ForceReattemptException if the peer is no longer available
+ * @throws CacheException if the peer generates an error
+ */
+ public boolean waitForResult() throws CacheException, ForceReattemptException {
+ waitForCacheException();
+ return this.returnValue;
+ }
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index 72e6657..c7cf5a6 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -14,7 +14,9 @@
*/
package org.apache.geode.internal.cache;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.anyLong;
@@ -31,7 +33,10 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.junit.Test;
+
import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.statistics.StatisticsClock;
public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
@@ -128,4 +133,76 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
}
}
+ @Test
+ public void cmnClearRegionWillDoNothingIfNotPrimary() {
+ RegionEventImpl event = createClearRegionEvent();
+ BucketRegion region = (BucketRegion) event.getRegion();
+ BucketAdvisor ba = mock(BucketAdvisor.class);
+ RegionVersionVector rvv = mock(RegionVersionVector.class);
+ doReturn(rvv).when(region).getVersionVector();
+ doReturn(ba).when(region).getBucketAdvisor();
+ when(ba.isPrimary()).thenReturn(false);
+ region.cmnClearRegion(event, true, true);
+ verify(region, never()).clearRegionLocally(eq(event), eq(true), eq(rvv));
+ }
+
+ @Test
+ public void cmnClearRegionCalledOnPrimary() {
+ RegionEventImpl event = createClearRegionEvent();
+ BucketRegion region = (BucketRegion) event.getRegion();
+ BucketAdvisor ba = mock(BucketAdvisor.class);
+ RegionVersionVector rvv = mock(RegionVersionVector.class);
+ doReturn(rvv).when(region).getVersionVector();
+ doReturn(true).when(region).getConcurrencyChecksEnabled();
+ doReturn(ba).when(region).getBucketAdvisor();
+ doNothing().when(region).distributeClearOperation(any(), any(), any());
+ doNothing().when(region).lockLocallyForClear(any(), any(), any());
+ doNothing().when(region).clearRegionLocally(event, true, null);
+ when(ba.isPrimary()).thenReturn(true);
+ region.cmnClearRegion(event, true, true);
+ verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null));
+ }
+
+ @Test
+ public void clearWillUseNullAsRVVWhenConcurrencyCheckDisabled() {
+ RegionEventImpl event = createClearRegionEvent();
+ BucketRegion region = (BucketRegion) event.getRegion();
+ BucketAdvisor ba = mock(BucketAdvisor.class);
+ doReturn(false).when(region).getConcurrencyChecksEnabled();
+ doReturn(ba).when(region).getBucketAdvisor();
+ doNothing().when(region).distributeClearOperation(any(), any(), any());
+ doNothing().when(region).lockLocallyForClear(any(), any(), any());
+ doNothing().when(region).clearRegionLocally(event, true, null);
+ when(ba.isPrimary()).thenReturn(true);
+ region.cmnClearRegion(event, true, true);
+ verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null));
+ }
+
+ @Test
+ public void obtainWriteLocksForClearInBRShouldNotDistribute() {
+ RegionEventImpl event = createClearRegionEvent();
+ BucketRegion region = (BucketRegion) event.getRegion();
+ doNothing().when(region).lockLocallyForClear(any(), any(), any());
+ region.obtainWriteLocksForClear(event, null);
+ assertTrue(region.isUsedForPartitionedRegionBucket());
+ }
+
+ @Test
+ public void updateSizeToZeroOnClearBucketRegion() {
+ RegionEventImpl event = createClearRegionEvent();
+ BucketRegion region = (BucketRegion) event.getRegion();
+ PartitionedRegion pr = region.getPartitionedRegion();
+ PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class);
+ PartitionedRegionStats prStats = mock(PartitionedRegionStats.class);
+ when(pr.getPrStats()).thenReturn(prStats);
+ doNothing().when(prStats).incDataStoreEntryCount(anyInt());
+ doNothing().when(prds).updateMemoryStats(anyInt());
+ when(pr.getDataStore()).thenReturn(prds);
+ region.updateSizeOnCreate("key1", 20);
+ long sizeBeforeClear = region.getTotalBytes();
+ assertEquals(20, sizeBeforeClear);
+ region.updateSizeOnClearRegion((int) sizeBeforeClear);
+ long sizeAfterClear = region.getTotalBytes();
+ assertEquals(0, sizeAfterClear);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 9fbd8fc..ca53ced 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache;
+import static org.apache.geode.internal.Assert.fail;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
@@ -53,6 +54,14 @@ public class DistributedRegionJUnitTest
@Override
protected void setInternalRegionArguments(InternalRegionArguments ira) {}
+ protected RegionEventImpl createClearRegionEvent() {
+ DistributedRegion region = prepare(true, true);
+ DistributedMember member = mock(DistributedMember.class);
+ RegionEventImpl regionEvent = new RegionEventImpl(region, Operation.REGION_CLEAR, null, false,
+ member, true);
+ return regionEvent;
+ }
+
@Override
protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled,
RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache,
@@ -246,4 +255,13 @@ public class DistributedRegionJUnitTest
region.basicBridgeReplace("key1", "value1", false, null, client, true, clientEvent);
assertThat(clientEvent.getVersionTag().equals(tag));
}
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void localClearIsNotSupportedOnReplicatedRegion() {
+ RegionEventImpl event = createClearRegionEvent();
+ DistributedRegion region = (DistributedRegion) event.getRegion();
+ region.basicLocalClear(event);
+ fail("Expect UnsupportedOperationException");
+ }
+
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
new file mode 100644
index 0000000..2cf5231
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplySender;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.PartitionedRegionStats;
+
+public class ClearPRMessageTest {
+
+ ClearPRMessage message;
+ PartitionedRegion region;
+ PartitionedRegionDataStore dataStore;
+ BucketRegion bucketRegion;
+
+ @Before
+ public void setup() throws ForceReattemptException {
+ message = spy(new ClearPRMessage());
+ region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+ dataStore = mock(PartitionedRegionDataStore.class);
+ when(region.getDataStore()).thenReturn(dataStore);
+ bucketRegion = mock(BucketRegion.class);
+ when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
+ }
+
+ @Test
+ public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() {
+ when(bucketRegion.isPrimary()).thenReturn(false);
+
+ assertThatThrownBy(() -> message.doLocalClear(region))
+ .isInstanceOf(ForceReattemptException.class)
+ .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+ }
+
+ @Test
+ public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
+ DistributedLockService mockLockService = mock(DistributedLockService.class);
+ doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+ when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false);
+ when(bucketRegion.isPrimary()).thenReturn(true);
+
+ assertThatThrownBy(() -> message.doLocalClear(region))
+ .isInstanceOf(ForceReattemptException.class)
+ .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
+ }
+
+ @Test
+ public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() {
+ DistributedLockService mockLockService = mock(DistributedLockService.class);
+ doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+ // Be primary on the first check, then be not primary on the second check
+ when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
+ when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+
+ assertThatThrownBy(() -> message.doLocalClear(region))
+ .isInstanceOf(ForceReattemptException.class)
+ .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+ // Confirm that we actually obtained and released the lock
+ verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
+ verify(mockLockService, times(1)).unlock(any());
+ }
+
+ @Test
+ public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
+ DistributedLockService mockLockService = mock(DistributedLockService.class);
+ doReturn(mockLockService).when(message).getPartitionRegionLockService();
+ NullPointerException exception = new NullPointerException("Error encountered");
+ doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+
+ // Be primary on the first check, then be not primary on the second check
+ when(bucketRegion.isPrimary()).thenReturn(true);
+ when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+
+ assertThatThrownBy(() -> message.doLocalClear(region))
+ .isInstanceOf(ForceReattemptException.class)
+ .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION);
+
+ // Confirm that cmnClearRegion was called
+ verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+ }
+
+ @Test
+ public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
+ throws ForceReattemptException {
+ DistributedLockService mockLockService = mock(DistributedLockService.class);
+ doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+
+ // Be primary on the first check, then be not primary on the second check
+ when(bucketRegion.isPrimary()).thenReturn(true);
+ when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+ assertThat(message.doLocalClear(region)).isTrue();
+
+ // Confirm that cmnClearRegion was called
+ verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+
+ // Confirm that we actually obtained and released the lock
+ verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
+ verify(mockLockService, times(1)).unlock(any());
+ }
+
+ @Test
+ public void initMessageSetsReplyProcessorCorrectlyWithDefinedReplyProcessor() {
+ InternalDistributedMember sender = mock(InternalDistributedMember.class);
+
+ Set<InternalDistributedMember> recipients = new HashSet<>();
+ recipients.add(sender);
+
+ ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
+ int mockProcessorId = 5;
+ when(mockProcessor.getProcessorId()).thenReturn(mockProcessorId);
+
+ message.initMessage(region, recipients, mockProcessor);
+
+ verify(mockProcessor, times(1)).enableSevereAlertProcessing();
+ assertThat(message.getProcessorId()).isEqualTo(mockProcessorId);
+ }
+
+ @Test
+ public void initMessageSetsProcessorIdToZeroWithNullProcessor() {
+ message.initMessage(region, null, null);
+
+ assertThat(message.getProcessorId()).isEqualTo(0);
+ }
+
+ @Test
+ public void sendThrowsExceptionIfPutOutgoingMethodReturnsNonNullSetOfFailures() {
+ InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ when(region.getDistributionManager()).thenReturn(distributionManager);
+
+ doNothing().when(message).initMessage(any(), any(), any());
+ Set<InternalDistributedMember> failures = new HashSet<>();
+ failures.add(recipient);
+
+ when(distributionManager.putOutgoing(message)).thenReturn(failures);
+
+ assertThatThrownBy(() -> message.send(recipient, region))
+ .isInstanceOf(ForceReattemptException.class)
+ .hasMessageContaining("Failed sending <" + message + ">");
+ }
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void operateOnPartitionedRegionCallsSendReplyWithNoExceptionWhenDoLocalClearSucceeds()
+ throws ForceReattemptException {
+ ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
+ InternalDistributedMember sender = mock(InternalDistributedMember.class);
+ int processorId = 1000;
+ int startTime = 0;
+
+ doReturn(true).when(message).doLocalClear(region);
+ doReturn(sender).when(message).getSender();
+ doReturn(processorId).when(message).getProcessorId();
+
+ // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
+ // do nothing and verify later that it was called with proper input
+ doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
+
+ message.operateOnPartitionedRegion(distributionManager, region, startTime);
+
+ verify(message, times(1)).sendReply(sender, processorId, distributionManager, null, region,
+ startTime);
+ }
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ public void operateOnPartitionedRegionCallsSendReplyWithExceptionWhenDoLocalClearFailsWithException()
+ throws ForceReattemptException {
+ ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
+ InternalDistributedMember sender = mock(InternalDistributedMember.class);
+ int processorId = 1000;
+ int startTime = 0;
+ ForceReattemptException exception =
+ new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+ doThrow(exception).when(message).doLocalClear(region);
+ doReturn(sender).when(message).getSender();
+ doReturn(processorId).when(message).getProcessorId();
+
+ // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
+ // do nothing and verify later that it was called with proper input
+ doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
+
+ message.operateOnPartitionedRegion(distributionManager, region, startTime);
+
+ verify(message, times(1)).sendReply(any(), anyInt(), any(), notNull(), any(), anyLong());
+ }
+
+ @Test
+ public void sendReplyEndsMessageProcessingIfWeHaveARegionAndHaveStartedProcessing() {
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+ PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
+ when(region.getPrStats()).thenReturn(partitionedRegionStats);
+
+ int processorId = 1000;
+ int startTime = 10000;
+ ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+ ReplySender replySender = mock(ReplySender.class);
+ doReturn(replySender).when(message).getReplySender(distributionManager);
+
+ message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
+
+ verify(partitionedRegionStats, times(1)).endPartitionMessagesProcessing(startTime);
+ }
+
+ @Test
+ public void sendReplyDoesNotEndMessageProcessingIfStartTimeIsZero() {
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+ PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
+ when(region.getPrStats()).thenReturn(partitionedRegionStats);
+
+ int processorId = 1000;
+ int startTime = 0;
+ ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+ ReplySender replySender = mock(ReplySender.class);
+ doReturn(replySender).when(message).getReplySender(distributionManager);
+
+ message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
+
+ verify(partitionedRegionStats, times(0)).endPartitionMessagesProcessing(startTime);
+ }
+
+ @Test
+ public void clearReplyMessageProcessCallsSetResponseIfReplyProcessorIsInstanceOfClearResponse() {
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ DMStats mockStats = mock(DMStats.class);
+ when(distributionManager.getStats()).thenReturn(mockStats);
+ ClearPRMessage.ClearReplyMessage clearReplyMessage = new ClearPRMessage.ClearReplyMessage();
+ ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
+
+ clearReplyMessage.process(distributionManager, mockProcessor);
+
+ verify(mockProcessor, times(1)).setResponse(clearReplyMessage);
+ }
+}
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index bf4dd99..7eabedb 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -57,6 +57,9 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
// NOTE, codes < -65536 will take 4 bytes to serialize
// NOTE, codes < -128 will take 2 bytes to serialize
+ short PR_CLEAR_REPLY_MESSAGE = -164;
+ short PR_CLEAR_MESSAGE = -163;
+
short DISTRIBUTED_PING_MESSAGE = -162;
short REGION_REDUNDANCY_STATUS = -161;