You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2021/04/30 20:53:01 UTC
[geode] branch feature/GEODE-7665 updated: GEODE-9195: Remove PR
clear local locking (#6410)
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push:
new e4a7b61 GEODE-9195: Remove PR clear local locking (#6410)
e4a7b61 is described below
commit e4a7b618ccccbf56d2582cfd11b6a86ea91b6e44
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 30 13:51:33 2021 -0700
GEODE-9195: Remove PR clear local locking (#6410)
Unit test changes in BucketRegion and DistributedRegion.
Unit test most of PartitionedRegionClearMessage.
---
.../codeAnalysis/sanctionedDataSerializables.txt | 2 +-
.../apache/geode/internal/cache/BucketRegion.java | 25 +-
.../geode/internal/cache/DistributedRegion.java | 29 ++-
.../internal/cache/PartitionedRegionClear.java | 15 +-
.../cache/PartitionedRegionClearMessage.java | 109 ++++++--
.../geode/internal/cache/RegionEventFactory.java | 30 +++
.../internal/cache/BucketRegionJUnitTest.java | 59 ++++-
.../internal/cache/DistributedRegionTest.java | 93 +++++--
.../cache/PartitionedRegionClearMessageTest.java | 285 +++++++++++++++++++++
9 files changed, 561 insertions(+), 86 deletions(-)
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index d1a8742..35d7a2b 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1076,7 +1076,7 @@ fromData,207
toData,178
org/apache/geode/internal/cache/PartitionedRegionClearMessage,2
-fromData,40
+fromData,49
toData,36
org/apache/geode/internal/cache/PartitionedRegionClearMessage$PartitionedRegionClearReplyMessage,2
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 49f6aad..18f2ef9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -37,6 +37,7 @@ import org.apache.geode.InternalGemFireError;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
@@ -577,11 +578,9 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// get rvvLock
Set<InternalDistributedMember> participants =
getCacheDistributionAdvisor().adviseInvalidateRegion();
- boolean isLockedAlready = this.partitionedRegion.getPartitionedRegionClear()
- .isLockedForListenerAndClientNotification();
try {
- obtainWriteLocksForClear(regionEvent, participants, isLockedAlready);
+ obtainWriteLocksForClear(regionEvent, participants);
// no need to dominate my own rvv.
// Clear is on going here, there won't be GII for this member
clearRegionLocally(regionEvent, cacheWrite, null);
@@ -589,10 +588,28 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// TODO: call reindexUserDataRegion if there're lucene indexes
} finally {
- releaseWriteLocksForClear(regionEvent, participants, isLockedAlready);
+ releaseWriteLocksForClear(regionEvent, participants);
}
}
+ @Override
+ protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
+ Set<InternalDistributedMember> participants) {
+ lockAndFlushClearToOthers(regionEvent, participants);
+ }
+
+ @Override
+ protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
+ Set<InternalDistributedMember> participants) {
+ distributedClearOperationReleaseLocks(regionEvent, participants);
+ }
+
+ @VisibleForTesting
+ void distributedClearOperationReleaseLocks(RegionEventImpl regionEvent,
+ Set<InternalDistributedMember> participants) {
+ DistributedClearOperation.releaseLocks(regionEvent, participants);
+ }
+
long generateTailKey() {
long key = eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets());
if (key < 0 || key % getPartitionedRegion().getTotalNumberOfBuckets() != getId()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 3d6df11..0f419ad 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2027,13 +2027,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
getCacheDistributionAdvisor().adviseInvalidateRegion();
// pause all generation of versions and flush from the other members to this one
try {
- obtainWriteLocksForClear(regionEvent, participants, false);
+ obtainWriteLocksForClear(regionEvent, participants);
clearRegionLocally(regionEvent, cacheWrite, null);
if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
distributeClearOperation(regionEvent, null, participants);
}
} finally {
- releaseWriteLocksForClear(regionEvent, participants, false);
+ releaseWriteLocksForClear(regionEvent, participants);
}
} finally {
distributedUnlockForClear();
@@ -2082,30 +2082,31 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
}
}
-
/**
* obtain locks preventing generation of new versions in other members
*/
protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
- Set<InternalDistributedMember> participants, boolean localLockedAlready) {
- if (!localLockedAlready) {
- lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
- }
- lockAndFlushClearToOthers(regionEvent, participants);
+ Set<InternalDistributedMember> recipients) {
+ lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
+ lockAndFlushClearToOthers(regionEvent, recipients);
}
/**
* releases the locks obtained in obtainWriteLocksForClear
*/
protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
- Set<InternalDistributedMember> participants,
- boolean localLockedAlready) {
- if (!localLockedAlready) {
- releaseLockLocallyForClear(regionEvent);
- }
- DistributedClearOperation.releaseLocks(regionEvent, participants);
+ Set<InternalDistributedMember> recipients) {
+ releaseLockLocallyForClear(regionEvent);
+ distributedClearOperationReleaseLocks(regionEvent, recipients);
}
+ @VisibleForTesting
+ void distributedClearOperationReleaseLocks(RegionEventImpl regionEvent,
+ Set<InternalDistributedMember> recipients) {
+ DistributedClearOperation.releaseLocks(regionEvent, recipients);
+ }
+
+ @VisibleForTesting
void lockAndFlushClearToOthers(RegionEventImpl regionEvent,
Set<InternalDistributedMember> participants) {
DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 5f4e589..8403306 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -36,6 +36,7 @@ import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType;
import org.apache.geode.internal.cache.PartitionedRegionClearMessage.PartitionedRegionClearResponse;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -141,8 +142,7 @@ public class PartitionedRegionClear {
*/
void obtainLockForClear(RegionEventImpl event) {
obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
- sendPartitionedRegionClearMessage(event,
- PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+ sendPartitionedRegionClearMessage(event, OperationType.OP_LOCK_FOR_PR_CLEAR);
}
/**
@@ -150,8 +150,7 @@ public class PartitionedRegionClear {
*/
void releaseLockForClear(RegionEventImpl event) {
releaseClearLockLocal();
- sendPartitionedRegionClearMessage(event,
- PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+ sendPartitionedRegionClearMessage(event, OperationType.OP_UNLOCK_FOR_PR_CLEAR);
}
/**
@@ -162,7 +161,7 @@ public class PartitionedRegionClear {
Set<Integer> localPrimaryBuckets = clearRegionLocal(regionEvent);
// this includes all remote primary buckets and their secondaries
Set<Integer> remotePrimaryBuckets = sendPartitionedRegionClearMessage(regionEvent,
- PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+ OperationType.OP_PR_CLEAR);
Set<Integer> allBucketsCleared = new HashSet<>();
allBucketsCleared.addAll(localPrimaryBuckets);
@@ -332,7 +331,7 @@ public class PartitionedRegionClear {
}
protected Set<Integer> sendPartitionedRegionClearMessage(RegionEventImpl event,
- PartitionedRegionClearMessage.OperationType op) {
+ OperationType op) {
RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone();
eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR);
@@ -349,7 +348,7 @@ public class PartitionedRegionClear {
* @return buckets that are cleared. empty set if any exception happened
*/
protected Set<Integer> attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
- PartitionedRegionClearMessage.OperationType op)
+ OperationType op)
throws ForceReattemptException {
Set<Integer> clearedBuckets = new HashSet<>();
@@ -394,7 +393,7 @@ public class PartitionedRegionClear {
clearMessage.send();
clearResponse.waitForRepliesUninterruptibly();
- clearedBuckets = clearResponse.bucketsCleared;
+ clearedBuckets = clearResponse.getBucketsCleared();
} catch (ReplyException e) {
Throwable cause = e.getCause();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
index cd33f78..c4c1ca5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
@@ -14,15 +14,19 @@
*/
package org.apache.geode.internal.cache;
+import static java.util.Collections.unmodifiableSet;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.Operation;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
@@ -52,22 +56,47 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
private Object callbackArgument;
private OperationType operationType;
private EventID eventId;
- private PartitionedRegion partitionedRegion;
private Set<Integer> bucketsCleared;
+ private DistributionManager distributionManager;
+ private RegionEventFactory regionEventFactory;
public PartitionedRegionClearMessage() {
// nothing
}
- PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients,
- PartitionedRegion partitionedRegion, ReplyProcessor21 replyProcessor21,
- PartitionedRegionClearMessage.OperationType operationType,
+ PartitionedRegionClearMessage(Collection<InternalDistributedMember> recipients,
+ PartitionedRegion partitionedRegion,
+ ReplyProcessor21 replyProcessor21,
+ OperationType operationType,
final RegionEventImpl regionEvent) {
- super(recipients, partitionedRegion.getPRId(), replyProcessor21);
- this.partitionedRegion = partitionedRegion;
+ this(recipients,
+ partitionedRegion.getDistributionManager(),
+ partitionedRegion.getPRId(),
+ replyProcessor21,
+ operationType,
+ regionEvent.getRawCallbackArgument(),
+ regionEvent.getEventId(),
+ partitionedRegion.getCache().getTxManager().isDistributed(),
+ RegionEventImpl::new);
+ }
+
+ @VisibleForTesting
+ PartitionedRegionClearMessage(Collection<InternalDistributedMember> recipients,
+ DistributionManager distributionManager,
+ int partitionedRegionId,
+ ReplyProcessor21 replyProcessor21,
+ OperationType operationType,
+ Object callbackArgument,
+ EventID eventId,
+ boolean isTransactionDistributed,
+ RegionEventFactory regionEventFactory) {
+ super(recipients, partitionedRegionId, replyProcessor21);
+ setTransactionDistributed(isTransactionDistributed);
+ this.distributionManager = distributionManager;
this.operationType = operationType;
- callbackArgument = regionEvent.getRawCallbackArgument();
- eventId = regionEvent.getEventId();
+ this.callbackArgument = callbackArgument;
+ this.eventId = eventId;
+ this.regionEventFactory = regionEventFactory;
}
@Override
@@ -82,8 +111,7 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
public void send() {
Objects.requireNonNull(getRecipients(), "ClearMessage NULL recipients set");
- setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed());
- partitionedRegion.getDistributionManager().putOutgoing(this);
+ distributionManager.putOutgoing(this);
}
@Override
@@ -108,15 +136,17 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
return true;
}
+ PartitionedRegionClear partitionedRegionClear = partitionedRegion.getPartitionedRegionClear();
+
if (operationType == OperationType.OP_LOCK_FOR_PR_CLEAR) {
- partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender());
+ partitionedRegionClear.obtainClearLockLocal(getSender());
} else if (operationType == OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
- partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal();
+ partitionedRegionClear.releaseClearLockLocal();
} else {
- RegionEventImpl event =
- new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true,
+ RegionEventImpl event = (RegionEventImpl) regionEventFactory
+ .create(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true,
partitionedRegion.getMyId(), getEventID());
- bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event);
+ bucketsCleared = partitionedRegionClear.clearRegionLocal(event);
}
return true;
}
@@ -125,9 +155,9 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
protected void appendFields(StringBuilder stringBuilder) {
super.appendFields(stringBuilder);
stringBuilder
- .append(" cbArg=")
+ .append(" callbackArgument=")
.append(callbackArgument)
- .append(" op=")
+ .append(" operationType=")
.append(operationType);
}
@@ -141,8 +171,10 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
throws IOException, ClassNotFoundException {
super.fromData(in, context);
callbackArgument = DataSerializer.readObject(in);
- operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+ operationType = OperationType.values()[in.readByte()];
eventId = DataSerializer.readObject(in);
+
+ regionEventFactory = RegionEventImpl::new;
}
@Override
@@ -160,21 +192,36 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
if (partitionedRegion != null && startTime > 0) {
partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
}
- PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage
+ PartitionedRegionClearReplyMessage
.send(recipient, processorId, getReplySender(distributionManager), operationType,
bucketsCleared, replyException);
}
+ @VisibleForTesting
+ DistributionManager getDistributionManagerForTesting() {
+ return distributionManager;
+ }
+
+ @VisibleForTesting
+ Object getCallbackArgumentForTesting() {
+ return callbackArgument;
+ }
+
+ @VisibleForTesting
+ RegionEventFactory getRegionEventFactoryForTesting() {
+ return regionEventFactory;
+ }
+
/**
* The response on which to wait for all the replies. This response ignores any exceptions
* received from the "far side"
*/
public static class PartitionedRegionClearResponse extends ReplyProcessor21 {
- CopyOnWriteHashSet<Integer> bucketsCleared = new CopyOnWriteHashSet<>();
+ private final Set<Integer> bucketsCleared = new CopyOnWriteHashSet<>();
public PartitionedRegionClearResponse(InternalDistributedSystem system,
- Set<InternalDistributedMember> recipients) {
+ Collection<InternalDistributedMember> recipients) {
super(system, recipients);
}
@@ -188,12 +235,15 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
}
process(message, true);
}
+
+ Set<Integer> getBucketsCleared() {
+ return unmodifiableSet(bucketsCleared);
+ }
}
public static class PartitionedRegionClearReplyMessage extends ReplyMessage {
private Set<Integer> bucketsCleared;
-
private OperationType operationType;
@Override
@@ -201,14 +251,17 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
return true;
}
- public static void send(InternalDistributedMember recipient, int processorId,
- ReplySender replySender, OperationType operationType, Set<Integer> bucketsCleared,
+ private static void send(InternalDistributedMember recipient,
+ int processorId,
+ ReplySender replySender,
+ OperationType operationType,
+ Set<Integer> bucketsCleared,
ReplyException replyException) {
Objects.requireNonNull(recipient, "partitionedRegionClearReplyMessage NULL reply message");
- PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage replyMessage =
- new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId,
- operationType, bucketsCleared, replyException);
+ PartitionedRegionClearReplyMessage replyMessage =
+ new PartitionedRegionClearReplyMessage(processorId, operationType, bucketsCleared,
+ replyException);
replyMessage.setRecipient(recipient);
replySender.putOutgoing(replyMessage);
@@ -260,7 +313,7 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
public void fromData(DataInput in, DeserializationContext context)
throws IOException, ClassNotFoundException {
super.fromData(in, context);
- operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+ operationType = OperationType.values()[in.readByte()];
bucketsCleared = DataSerializer.readObject(in);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java
new file mode 100644
index 0000000..c759a44
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.distributed.DistributedMember;
+
+@FunctionalInterface
+public interface RegionEventFactory {
+
+ RegionEvent create(PartitionedRegion partitionedRegion,
+ Operation operation,
+ Object callbackArgument,
+ boolean originRemote,
+ DistributedMember distributedMember,
+ EventID eventId);
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index 0d1cc87..c0a635f 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -14,11 +14,13 @@
*/
package org.apache.geode.internal.cache;
+import static java.util.Collections.emptySet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -29,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -36,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.junit.Test;
import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.statistics.StatisticsClock;
@@ -183,17 +187,6 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
}
@Test
- public void obtainWriteLocksForClearInBRShouldDistribute() {
- RegionEventImpl event = createClearRegionEvent();
- BucketRegion region = (BucketRegion) event.getRegion();
- doNothing().when(region).lockLocallyForClear(any(), any(), any());
- doNothing().when(region).lockAndFlushClearToOthers(any(), any());
- region.obtainWriteLocksForClear(event, null, false);
- verify(region).lockLocallyForClear(any(), any(), eq(event));
- verify(region).lockAndFlushClearToOthers(eq(event), eq(null));
- }
-
- @Test
public void updateSizeToZeroOnClearBucketRegion() {
RegionEventImpl event = createClearRegionEvent();
BucketRegion region = (BucketRegion) event.getRegion();
@@ -211,4 +204,48 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
long sizeAfterClear = region.getTotalBytes();
assertEquals(0, sizeAfterClear);
}
+
+ @Test
+ public void obtainWriteLocksForClearInBRShouldLockAndFlushToOthers() {
+ RegionEventImpl event = createClearRegionEvent();
+ BucketRegion region = (BucketRegion) event.getRegion();
+ doNothing().when(region).lockAndFlushClearToOthers(any(), any());
+ region.obtainWriteLocksForClear(event, null);
+ verify(region).lockAndFlushClearToOthers(eq(event), eq(null));
+ }
+
+ @Test
+ public void obtainWriteLocksForClear_invokes_lockAndFlushClearToOthers() {
+ Set<InternalDistributedMember> recipients = emptySet();
+ BucketRegion bucketRegion = bucketRegionForClearLocking();
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+ bucketRegion.obtainWriteLocksForClear(regionEvent, recipients);
+
+ verify(bucketRegion).lockAndFlushClearToOthers(regionEvent, recipients);
+ }
+
+ @Test
+ public void releaseWriteLocksForClear_invokes_distributedClearOperationReleaseLocks() {
+ Set<InternalDistributedMember> recipients = emptySet();
+ BucketRegion bucketRegion = bucketRegionForClearLocking();
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+ bucketRegion.releaseWriteLocksForClear(regionEvent, recipients);
+
+ verify(bucketRegion).distributedClearOperationReleaseLocks(regionEvent, recipients);
+ }
+
+ private BucketRegion bucketRegionForClearLocking() {
+ // use partial-mock with null fields to verify method invocations
+ BucketRegion bucketRegion = mock(BucketRegion.class, CALLS_REAL_METHODS);
+
+ // doNothing when invoking locking methods for clear
+ doNothing().when(bucketRegion).lockAndFlushClearToOthers(any(), any());
+
+ // doNothing when invoking unlocking methods for clear
+ doNothing().when(bucketRegion).distributedClearOperationReleaseLocks(any(), any());
+
+ return bucketRegion;
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
index 13a2685..185c67d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
@@ -14,14 +14,19 @@
*/
package org.apache.geode.internal.cache;
+import static java.util.Collections.emptySet;
import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -29,6 +34,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
+import java.util.Set;
import org.junit.Before;
import org.junit.Test;
@@ -43,36 +49,22 @@ import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
-
public class DistributedRegionTest {
+
private RegionVersionVector<VersionSource<Object>> vector;
private RegionVersionHolder<VersionSource<Object>> holder;
private VersionSource<Object> lostMemberVersionID;
private InternalDistributedMember member;
@Before
- @SuppressWarnings("unchecked")
public void setup() {
- vector = mock(RegionVersionVector.class);
- holder = mock(RegionVersionHolder.class);
- lostMemberVersionID = mock(VersionSource.class);
+ vector = uncheckedCast(mock(RegionVersionVector.class));
+ holder = uncheckedCast(mock(RegionVersionHolder.class));
+ lostMemberVersionID = uncheckedCast(mock(VersionSource.class));
member = mock(InternalDistributedMember.class);
}
@Test
- public void shouldBeMockable() throws Exception {
- DistributedRegion mockDistributedRegion = mock(DistributedRegion.class);
- EntryEventImpl mockEntryEventImpl = mock(EntryEventImpl.class);
- Object returnValue = new Object();
-
- when(mockDistributedRegion.validatedDestroy(any(), eq(mockEntryEventImpl)))
- .thenReturn(returnValue);
-
- assertThat(mockDistributedRegion.validatedDestroy(new Object(), mockEntryEventImpl))
- .isSameAs(returnValue);
- }
-
- @Test
public void cleanUpAfterFailedInitialImageHoldsLockForClear() {
DistributedRegion distributedRegion = mock(DistributedRegion.class, RETURNS_DEEP_STUBS);
RegionMap regionMap = mock(RegionMap.class);
@@ -99,7 +91,7 @@ public class DistributedRegionTest {
distributedRegion.cleanUpAfterFailedGII(true);
- verify(diskRegion).resetRecoveredEntries(eq(distributedRegion));
+ verify(diskRegion).resetRecoveredEntries(distributedRegion);
verify(distributedRegion, never()).closeEntries();
}
@@ -260,4 +252,65 @@ public class DistributedRegionTest {
.hasMessage("Parallel Gateway Sender " + senderId
+ " can not be used with replicated region " + regionPath);
}
+
+ @Test
+ public void obtainWriteLocksForClear_invokes_lockLocallyForClear() {
+ DistributedRegion distributedRegion = distributedRegionForClearLocking();
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+ distributedRegion.obtainWriteLocksForClear(regionEvent, emptySet());
+
+ verify(distributedRegion).lockLocallyForClear(any(), any(), eq(regionEvent));
+ }
+
+ @Test
+ public void obtainWriteLocksForClear_invokes_lockAndFlushClearToOthers() {
+ Set<InternalDistributedMember> recipients = emptySet();
+ DistributedRegion distributedRegion = distributedRegionForClearLocking();
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+ distributedRegion.obtainWriteLocksForClear(regionEvent, recipients);
+
+ verify(distributedRegion).lockAndFlushClearToOthers(regionEvent, recipients);
+ }
+
+ @Test
+ public void releaseWriteLocksForClear_invokes_releaseLockLocallyForClear() {
+ DistributedRegion distributedRegion = distributedRegionForClearLocking();
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+ distributedRegion.releaseWriteLocksForClear(regionEvent, emptySet());
+
+ verify(distributedRegion).releaseLockLocallyForClear(regionEvent);
+ }
+
+ @Test
+ public void releaseWriteLocksForClear_invokes_distributedClearOperationReleaseLocks() {
+ Set<InternalDistributedMember> recipients = emptySet();
+ DistributedRegion distributedRegion = distributedRegionForClearLocking();
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+
+ distributedRegion.releaseWriteLocksForClear(regionEvent, recipients);
+
+ verify(distributedRegion).distributedClearOperationReleaseLocks(regionEvent, recipients);
+ }
+
+ private DistributedRegion distributedRegionForClearLocking() {
+ // use partial-mock with null fields to verify method invocations
+ DistributedRegion distributedRegion = mock(DistributedRegion.class, CALLS_REAL_METHODS);
+
+ // stub out getDistributionManager and getMyId
+ doReturn(null).when(distributedRegion).getDistributionManager();
+ doReturn(null).when(distributedRegion).getMyId();
+
+ // doNothing when invoking locking methods for clear
+ doNothing().when(distributedRegion).lockAndFlushClearToOthers(any(), any());
+ doNothing().when(distributedRegion).lockLocallyForClear(any(), any(), any());
+
+ // doNothing when invoking unlocking methods for clear
+ doNothing().when(distributedRegion).distributedClearOperationReleaseLocks(any(), any());
+ doNothing().when(distributedRegion).releaseLockLocallyForClear(any());
+
+ return distributedRegion;
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java
new file mode 100644
index 0000000..4e67fc1
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.Collections.emptySet;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType;
+
+public class PartitionedRegionClearMessageTest {
+
+ private Collection<InternalDistributedMember> recipients;
+ private DistributionManager distributionManager;
+ private PartitionedRegion partitionedRegion;
+ private ReplyProcessor21 replyProcessor21;
+ private Object callbackArgument;
+ private EventID eventId;
+ private RegionEventFactory regionEventFactory;
+
+ @Before
+ public void setUp() {
+ recipients = emptySet();
+ distributionManager = mock(DistributionManager.class);
+ partitionedRegion = mock(PartitionedRegion.class);
+ replyProcessor21 = mock(ReplyProcessor21.class);
+ callbackArgument = new Object();
+ eventId = mock(EventID.class);
+ regionEventFactory = mock(RegionEventFactory.class);
+ }
+
+ @Test
+ public void construction_throwsNullPointerExceptionIfRecipientsIsNull() {
+ Throwable thrown = catchThrowable(() -> {
+ new PartitionedRegionClearMessage(null, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+ });
+
+ assertThat(thrown).isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ public void construction_findsAllDependencies() {
+ boolean isTransactionDistributed = true;
+ int regionId = 10;
+ InternalCache cache = mock(InternalCache.class);
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ TXManagerImpl txManager = mock(TXManagerImpl.class);
+ when(cache.getTxManager()).thenReturn(txManager);
+ when(partitionedRegion.getCache()).thenReturn(cache);
+ when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager);
+ when(partitionedRegion.getPRId()).thenReturn(regionId);
+ when(regionEvent.getEventId()).thenReturn(eventId);
+ when(regionEvent.getRawCallbackArgument()).thenReturn(callbackArgument);
+ when(txManager.isDistributed()).thenReturn(isTransactionDistributed);
+
+ PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients,
+ partitionedRegion,
+ replyProcessor21,
+ OperationType.OP_PR_CLEAR,
+ regionEvent);
+
+ assertThat(message.getDistributionManagerForTesting()).isSameAs(distributionManager);
+ assertThat(message.getCallbackArgumentForTesting()).isSameAs(callbackArgument);
+ assertThat(message.getRegionId()).isEqualTo(regionId);
+ assertThat(message.getEventID()).isEqualTo(eventId);
+ assertThat(message.isTransactionDistributed()).isEqualTo(isTransactionDistributed);
+
+ RegionEventFactory regionEventFactory = message.getRegionEventFactoryForTesting();
+ RegionEvent<?, ?> created =
+ regionEventFactory.create(partitionedRegion, Operation.DESTROY, callbackArgument, false,
+ mock(DistributedMember.class), mock(EventID.class));
+ assertThat(created).isInstanceOf(RegionEventImpl.class);
+ }
+
+ @Test
+ public void construction_setsTransactionDistributed() {
+ boolean isTransactionDistributed = true;
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId,
+ isTransactionDistributed, regionEventFactory);
+
+ boolean value = message.isTransactionDistributed();
+
+ assertThat(value).isEqualTo(isTransactionDistributed);
+ }
+
+ @Test
+ public void getEventID_returnsTheEventId() {
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ EventID value = message.getEventID();
+
+ assertThat(value).isSameAs(eventId);
+ }
+
+ @Test
+ public void getOperationType_returnsTheOperationType() {
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ OperationType value = message.getOperationType();
+
+ assertThat(value).isSameAs(OperationType.OP_PR_CLEAR);
+ }
+
+ @Test
+ public void send_putsOutgoing() {
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ message.send();
+
+ verify(distributionManager).putOutgoing(message);
+ }
+
+ @Test
+ public void processCheckForPR_returnsForceReattemptException_whenRegionIsNotInitialized() {
+ DistributionAdvisor distributionAdvisor = mock(DistributionAdvisor.class);
+ when(distributionAdvisor.isInitialized()).thenReturn(false);
+ when(partitionedRegion.getDistributionAdvisor()).thenReturn(distributionAdvisor);
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ Throwable throwable = message.processCheckForPR(partitionedRegion, distributionManager);
+
+ assertThat(throwable)
+ .isInstanceOf(ForceReattemptException.class)
+ .hasMessageContaining("could not find partitioned region with Id");
+ }
+
+ @Test
+ public void processCheckForPR_returnsNull_whenRegionIsNull() {
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ Throwable throwable = message.processCheckForPR(null, distributionManager);
+
+ assertThat(throwable).isNull();
+ }
+
+ @Test
+ public void processCheckForPR_returnsNull_whenRegionIsInitialized() {
+ DistributionAdvisor distributionAdvisor = mock(DistributionAdvisor.class);
+ when(distributionAdvisor.isInitialized()).thenReturn(true);
+ when(partitionedRegion.getDistributionAdvisor()).thenReturn(distributionAdvisor);
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ Throwable throwable = message.processCheckForPR(null, distributionManager);
+
+ assertThat(throwable).isNull();
+ }
+
+ @Test
+ public void operateOnPartitionedRegion_returnsTrue_whenRegionIsNull() {
+ ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+ PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class);
+ when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear);
+ when(partitionedRegionClear.clearRegionLocal(any())).thenReturn(emptySet());
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ boolean result =
+ message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ public void operateOnPartitionedRegion_returnsTrue_whenRegionIsDestroyed() {
+ ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+ when(partitionedRegion.isDestroyed()).thenReturn(true);
+ PartitionedRegionClearMessage message =
+ new PartitionedRegionClearMessage(recipients, distributionManager, 1,
+ replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ boolean result =
+ message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ public void operateOnPartitionedRegion_obtainsClearLockLocal_whenOperationTypeIs_OP_LOCK_FOR_PR_CLEAR() {
+ ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+ PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class);
+ when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear);
+ PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients,
+ clusterDistributionManager, 1, replyProcessor21,
+ OperationType.OP_LOCK_FOR_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ boolean result =
+ message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+ assertThat(result).isTrue();
+ verify(partitionedRegionClear).obtainClearLockLocal(any());
+ }
+
+ @Test
+ public void operateOnPartitionedRegion_releasesClearLockLocal_whenOperationTypeIs_OP_UNLOCK_FOR_PR_CLEAR() {
+ ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+ PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class);
+ when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear);
+ PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients,
+ clusterDistributionManager, 1, replyProcessor21,
+ OperationType.OP_UNLOCK_FOR_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ boolean result =
+ message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+ assertThat(result).isTrue();
+ verify(partitionedRegionClear).releaseClearLockLocal();
+ }
+
+ @Test
+ public void operateOnPartitionedRegion_clearsRegionLocal_whenOperationTypeIs_OP_PR_CLEAR() {
+ ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class);
+ PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class);
+ when(partitionedRegion.getPartitionedRegionClear())
+ .thenReturn(partitionedRegionClear);
+ when(regionEventFactory.create(any(), any(), any(), anyBoolean(), any(), any()))
+ .thenReturn(mock(RegionEventImpl.class));
+ PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients,
+ clusterDistributionManager, 1, replyProcessor21,
+ OperationType.OP_PR_CLEAR, callbackArgument, eventId, false,
+ regionEventFactory);
+
+ boolean result =
+ message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30);
+
+ assertThat(result).isTrue();
+ verify(partitionedRegionClear).clearRegionLocal(any(RegionEventImpl.class));
+ }
+}