You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/28 19:14:30 UTC

[geode] 03/23: PR.clear's event id should be created and used in BR (#4805)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4abc37899137b12f64df3a70484eda1954f365d2
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Mon Mar 16 17:35:35 2020 -0700

    PR.clear's event id should be created and used in BR (#4805)
    
    * GEODE-7857: PR.clear's event id should be created and used in BR
---
 .../PartitionedRegionPersistentClearDUnitTest.java |  2 +-
 .../codeAnalysis/sanctionedDataSerializables.txt   |  4 +-
 .../geode/internal/cache/PartitionedRegion.java    |  8 +--
 .../internal/cache/partitioned/ClearPRMessage.java | 12 ++--
 .../internal/cache/PartitionedRegionTest.java      | 65 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 11 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
index 847699b..c758446 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
@@ -21,6 +21,6 @@ import org.apache.geode.cache.RegionShortcut;
 public class PartitionedRegionPersistentClearDUnitTest extends PartitionedRegionClearDUnitTest {
 
   protected RegionShortcut getRegionShortCut() {
-    return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
+    return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT;
   }
 }
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index fb83c84..8e522a2 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1377,8 +1377,8 @@ fromData,27
 toData,27
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
-fromData,19
-toData,36
+fromData,30
+toData,44
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
 fromData,17
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 1aa427a..ffb01af 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -2191,7 +2191,7 @@ public class PartitionedRegion extends LocalRegion
         }
 
         // create ClearPRMessage per bucket
-        List<ClearPRMessage> clearMsgList = createClearPRMessages();
+        List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId());
         for (ClearPRMessage clearPRMessage : clearMsgList) {
           int bucketId = clearPRMessage.getBucketId();
           checkReadiness();
@@ -2363,10 +2363,10 @@ public class PartitionedRegion extends LocalRegion
     }
   }
 
-  List<ClearPRMessage> createClearPRMessages() {
+  List<ClearPRMessage> createClearPRMessages(EventID eventID) {
     ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
-    for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) {
-      ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId);
+    for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) {
+      ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId, eventID);
       clearMsgList.add(clearPRMessage);
     }
     return clearMsgList;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
index 9fa8057..cc01920 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -56,6 +56,8 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
 
   private Integer bucketId;
 
+  private EventID eventID;
+
   public static final String BUCKET_NON_PRIMARY_MESSAGE =
       "The bucket region on target member is no longer primary";
   public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
@@ -71,8 +73,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
    */
   public ClearPRMessage() {}
 
-  public ClearPRMessage(int bucketId) {
+  public ClearPRMessage(int bucketId, EventID eventID) {
     this.bucketId = bucketId;
+    this.eventID = eventID;
   }
 
   public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
@@ -119,6 +122,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     } else {
       InternalDataSerializer.writeSignedVL(bucketId, out);
     }
+    DataSerializer.writeObject(this.eventID, out);
   }
 
   @Override
@@ -126,6 +130,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
     this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
+    this.eventID = (EventID) DataSerializer.readObject(in);
   }
 
   @Override
@@ -168,9 +173,8 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
       throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
     }
     try {
-      RegionEventImpl regionEvent = new RegionEventImpl();
-      regionEvent.setOperation(Operation.REGION_CLEAR);
-      regionEvent.setRegion(bucketRegion);
+      RegionEventImpl regionEvent = new RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null,
+          false, region.getMyId(), eventID);
       bucketRegion.cmnClearRegion(regionEvent, true, true);
     } catch (PartitionOfflineException poe) {
       logger.info(
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 742db8a..898c4f7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -21,10 +21,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -39,6 +41,7 @@ import static org.mockito.quality.Strictness.STRICT_STUBS;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -55,6 +58,7 @@ import org.mockito.junit.MockitoRule;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.Statistics;
 import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.Operation;
@@ -71,6 +75,7 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.FetchKeysMessage;
 import org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
@@ -208,6 +213,66 @@ public class PartitionedRegionTest {
   }
 
   @Test
+  public void clearShouldNotThrowUnsupportedOperationException() {
+    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+    doNothing().when(spyPartitionedRegion).checkReadiness();
+    doCallRealMethod().when(spyPartitionedRegion).basicClear(any());
+    doNothing().when(spyPartitionedRegion).basicClear(any(), anyBoolean());
+    spyPartitionedRegion.clear();
+  }
+
+  @Test(expected = CacheClosedException.class)
+  public void clearShouldThrowCacheClosedExceptionIfShutdownAll() {
+    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+    RegionEventImpl regionEvent =
+        new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
+            spyPartitionedRegion.getMyId(), true);
+    when(cache.isCacheAtShutdownAll()).thenReturn(true);
+    when(cache.getCacheClosedException("Cache is shutting down"))
+        .thenReturn(new CacheClosedException("Cache is shutting down"));
+    DistributedLockService lockService = mock(DistributedLockService.class);
+    when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService);
+    String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_');
+    when(lockService.lock(lockName, -1, -1)).thenReturn(true);
+    spyPartitionedRegion.basicClear(regionEvent, true);
+  }
+
+  @Test
+  public void createClearPRMessagesShouldCreateMessagePerBucket() {
+    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+    RegionEventImpl regionEvent =
+        new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
+            spyPartitionedRegion.getMyId(), true);
+    when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3);
+    EventID eventID = new EventID(spyPartitionedRegion.getCache().getDistributedSystem());
+    List<ClearPRMessage> msgs = spyPartitionedRegion.createClearPRMessages(eventID);
+    assertThat(msgs.size()).isEqualTo(3);
+  }
+
+  @Test
+  public void sendEachMessagePerBucket() {
+    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+    RegionEventImpl regionEvent =
+        new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
+            spyPartitionedRegion.getMyId(), true);
+    when(cache.isCacheAtShutdownAll()).thenReturn(false);
+    DistributedLockService lockService = mock(DistributedLockService.class);
+    when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService);
+    when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3);
+    String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_');
+    when(lockService.lock(lockName, -1, -1)).thenReturn(true);
+    when(spyPartitionedRegion.hasListener()).thenReturn(true);
+    doNothing().when(spyPartitionedRegion).dispatchListenerEvent(any(), any());
+    doNothing().when(spyPartitionedRegion).notifyBridgeClients(eq(regionEvent));
+    doNothing().when(spyPartitionedRegion).checkReadiness();
+    doNothing().when(lockService).unlock(lockName);
+    spyPartitionedRegion.basicClear(regionEvent, true);
+    verify(spyPartitionedRegion, times(3)).sendClearMsgByBucket(any(), any());
+    verify(spyPartitionedRegion, times(1)).dispatchListenerEvent(any(), any());
+    verify(spyPartitionedRegion, times(1)).notifyBridgeClients(eq(regionEvent));
+  }
+
+  @Test
   public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
     // ARRANGE
     EntryEventImpl clientEvent = mock(EntryEventImpl.class);