You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/28 19:14:30 UTC
[geode] 03/23: PR.clear's event id should be created and used in BR
(#4805)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 4abc37899137b12f64df3a70484eda1954f365d2
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Mon Mar 16 17:35:35 2020 -0700
PR.clear's event id should be created and used in BR (#4805)
* GEODE-7857: PR.clear's event id should be created and used in BR
---
.../PartitionedRegionPersistentClearDUnitTest.java | 2 +-
.../codeAnalysis/sanctionedDataSerializables.txt | 4 +-
.../geode/internal/cache/PartitionedRegion.java | 8 +--
.../internal/cache/partitioned/ClearPRMessage.java | 12 ++--
.../internal/cache/PartitionedRegionTest.java | 65 ++++++++++++++++++++++
5 files changed, 80 insertions(+), 11 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
index 847699b..c758446 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
@@ -21,6 +21,6 @@ import org.apache.geode.cache.RegionShortcut;
public class PartitionedRegionPersistentClearDUnitTest extends PartitionedRegionClearDUnitTest {
protected RegionShortcut getRegionShortCut() {
- return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
+ return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT;
}
}
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index fb83c84..8e522a2 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1377,8 +1377,8 @@ fromData,27
toData,27
org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
-fromData,19
-toData,36
+fromData,30
+toData,44
org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
fromData,17
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 1aa427a..ffb01af 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -2191,7 +2191,7 @@ public class PartitionedRegion extends LocalRegion
}
// create ClearPRMessage per bucket
- List<ClearPRMessage> clearMsgList = createClearPRMessages();
+ List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId());
for (ClearPRMessage clearPRMessage : clearMsgList) {
int bucketId = clearPRMessage.getBucketId();
checkReadiness();
@@ -2363,10 +2363,10 @@ public class PartitionedRegion extends LocalRegion
}
}
- List<ClearPRMessage> createClearPRMessages() {
+ List<ClearPRMessage> createClearPRMessages(EventID eventID) {
ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
- for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) {
- ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId);
+ for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) {
+ ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId, eventID);
clearMsgList.add(clearPRMessage);
}
return clearMsgList;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
index 9fa8057..cc01920 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -56,6 +56,8 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
private Integer bucketId;
+ private EventID eventID;
+
public static final String BUCKET_NON_PRIMARY_MESSAGE =
"The bucket region on target member is no longer primary";
public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
@@ -71,8 +73,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
*/
public ClearPRMessage() {}
- public ClearPRMessage(int bucketId) {
+ public ClearPRMessage(int bucketId, EventID eventID) {
this.bucketId = bucketId;
+ this.eventID = eventID;
}
public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
@@ -119,6 +122,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
} else {
InternalDataSerializer.writeSignedVL(bucketId, out);
}
+ DataSerializer.writeObject(this.eventID, out);
}
@Override
@@ -126,6 +130,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
+ this.eventID = (EventID) DataSerializer.readObject(in);
}
@Override
@@ -168,9 +173,8 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
}
try {
- RegionEventImpl regionEvent = new RegionEventImpl();
- regionEvent.setOperation(Operation.REGION_CLEAR);
- regionEvent.setRegion(bucketRegion);
+ RegionEventImpl regionEvent = new RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null,
+ false, region.getMyId(), eventID);
bucketRegion.cmnClearRegion(regionEvent, true, true);
} catch (PartitionOfflineException poe) {
logger.info(
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 742db8a..898c4f7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -21,10 +21,12 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -39,6 +41,7 @@ import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -55,6 +58,7 @@ import org.mockito.junit.MockitoRule;
import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.Operation;
@@ -71,6 +75,7 @@ import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
import org.apache.geode.internal.cache.partitioned.FetchKeysMessage;
import org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
@@ -208,6 +213,66 @@ public class PartitionedRegionTest {
}
@Test
+ public void clearShouldNotThrowUnsupportedOperationException() {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+ doNothing().when(spyPartitionedRegion).checkReadiness();
+ doCallRealMethod().when(spyPartitionedRegion).basicClear(any());
+ doNothing().when(spyPartitionedRegion).basicClear(any(), anyBoolean());
+ spyPartitionedRegion.clear();
+ }
+
+ @Test(expected = CacheClosedException.class)
+ public void clearShouldThrowCacheClosedExceptionIfShutdownAll() {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+ RegionEventImpl regionEvent =
+ new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
+ spyPartitionedRegion.getMyId(), true);
+ when(cache.isCacheAtShutdownAll()).thenReturn(true);
+ when(cache.getCacheClosedException("Cache is shutting down"))
+ .thenReturn(new CacheClosedException("Cache is shutting down"));
+ DistributedLockService lockService = mock(DistributedLockService.class);
+ when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService);
+ String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_');
+ when(lockService.lock(lockName, -1, -1)).thenReturn(true);
+ spyPartitionedRegion.basicClear(regionEvent, true);
+ }
+
+ @Test
+ public void createClearPRMessagesShouldCreateMessagePerBucket() {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+ RegionEventImpl regionEvent =
+ new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
+ spyPartitionedRegion.getMyId(), true);
+ when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3);
+ EventID eventID = new EventID(spyPartitionedRegion.getCache().getDistributedSystem());
+ List<ClearPRMessage> msgs = spyPartitionedRegion.createClearPRMessages(eventID);
+ assertThat(msgs.size()).isEqualTo(3);
+ }
+
+ @Test
+ public void sendEachMessagePerBucket() {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+ RegionEventImpl regionEvent =
+ new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
+ spyPartitionedRegion.getMyId(), true);
+ when(cache.isCacheAtShutdownAll()).thenReturn(false);
+ DistributedLockService lockService = mock(DistributedLockService.class);
+ when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService);
+ when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3);
+ String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_');
+ when(lockService.lock(lockName, -1, -1)).thenReturn(true);
+ when(spyPartitionedRegion.hasListener()).thenReturn(true);
+ doNothing().when(spyPartitionedRegion).dispatchListenerEvent(any(), any());
+ doNothing().when(spyPartitionedRegion).notifyBridgeClients(eq(regionEvent));
+ doNothing().when(spyPartitionedRegion).checkReadiness();
+ doNothing().when(lockService).unlock(lockName);
+ spyPartitionedRegion.basicClear(regionEvent, true);
+ verify(spyPartitionedRegion, times(3)).sendClearMsgByBucket(any(), any());
+ verify(spyPartitionedRegion, times(1)).dispatchListenerEvent(any(), any());
+ verify(spyPartitionedRegion, times(1)).notifyBridgeClients(eq(regionEvent));
+ }
+
+ @Test
public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
// ARRANGE
EntryEventImpl clientEvent = mock(EntryEventImpl.class);