You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jj...@apache.org on 2020/04/17 08:33:06 UTC

[geode] branch develop updated: GEODE-7940: Fix Tracking of ShadowBuckets Destroyed (#4934)

This is an automated email from the ASF dual-hosted git repository.

jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new bfbb398  GEODE-7940: Fix Tracking of ShadowBuckets Destroyed (#4934)
bfbb398 is described below

commit bfbb398891c5d96fa3a5975365b29d71bd849ad6
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Fri Apr 17 09:32:22 2020 +0100

    GEODE-7940: Fix Tracking of ShadowBuckets Destroyed (#4934)
    
    The BucketAdvisor can now keep track of more than just one shadow
    bucket to avoid incorrectly marking all of them as destroyed.
    
    - Added unit and distributed tests.
---
 .../apache/geode/internal/cache/BucketAdvisor.java |  20 +++-
 .../internal/cache/PartitionedRegionDataStore.java |  10 +-
 .../wan/parallel/ParallelGatewaySenderQueue.java   |  99 +++++++++---------
 .../geode/internal/cache/BucketAdvisorTest.java    |  80 +++++++++++++-
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 116 +++++++++++++++++++++
 5 files changed, 261 insertions(+), 64 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 822c20e..e4045c3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -164,7 +166,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
 
   private PartitionedRegion pRegion;
 
-  private volatile boolean shadowBucketDestroyed;
+  final ConcurrentMap<String, Boolean> destroyedShadowBuckets = new ConcurrentHashMap<>();
 
   /**
    * Constructs a new BucketAdvisor for the Bucket owned by RegionAdvisor.
@@ -2742,11 +2744,19 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
     }
   }
 
-  void setShadowBucketDestroyed(boolean destroyed) {
-    shadowBucketDestroyed = destroyed;
+  void markAllShadowBucketsAsNonDestroyed() {
+    destroyedShadowBuckets.clear();
   }
 
-  public boolean getShadowBucketDestroyed() {
-    return shadowBucketDestroyed;
+  void markAllShadowBucketsAsDestroyed() {
+    destroyedShadowBuckets.forEach((k, v) -> destroyedShadowBuckets.put(k, true));
+  }
+
+  void markShadowBucketAsDestroyed(String shadowBucketPath) {
+    destroyedShadowBuckets.put(shadowBucketPath, true);
+  }
+
+  public boolean isShadowBucketDestroyed(String shadowBucketPath) {
+    return destroyedShadowBuckets.getOrDefault(shadowBucketPath, false);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 1506de2..23a7487 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -451,11 +451,11 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
               try {
                 buk.initializePrimaryElector(creationRequestor);
                 if (getPartitionedRegion().getColocatedWith() == null) {
-                  buk.getBucketAdvisor().setShadowBucketDestroyed(false);
+                  buk.getBucketAdvisor().markAllShadowBucketsAsNonDestroyed();
                 }
                 if (getPartitionedRegion().isShadowPR()) {
                   getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
-                      .getBucketAdvisor(possiblyFreeBucketId).setShadowBucketDestroyed(false);
+                      .getBucketAdvisor(possiblyFreeBucketId).markAllShadowBucketsAsNonDestroyed();
                 }
                 bukReg = createBucketRegion(possiblyFreeBucketId);
                 // Mark the bucket as hosting and distribute to peers
@@ -472,7 +472,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
                   bukReg.invokePartitionListenerAfterBucketCreated();
                 } else {
                   if (buk.getPartitionedRegion().getColocatedWith() == null) {
-                    buk.getBucketAdvisor().setShadowBucketDestroyed(true);
+                    buk.getBucketAdvisor().markAllShadowBucketsAsDestroyed();
                     // clear tempQueue for all the shadowPR buckets
                     clearAllTempQueueForShadowPR(buk.getBucketId());
                   }
@@ -1423,7 +1423,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
                     && buk.getPartitionedRegion().isShadowPR()) {
                   if (buk.getPartitionedRegion().getColocatedWithRegion() != null) {
                     buk.getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
-                        .getBucketAdvisor(bucketId).setShadowBucketDestroyed(true);
+                        .getBucketAdvisor(bucketId).markShadowBucketAsDestroyed(buk.getFullPath());
                   }
                 }
               } catch (RegionDestroyedException ignore) {
@@ -1593,7 +1593,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
         if (bucketRegion.getPartitionedRegion().isShadowPR()) {
           if (bucketRegion.getPartitionedRegion().getColocatedWithRegion() != null) {
             bucketRegion.getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
-                .getBucketAdvisor(bucketId).setShadowBucketDestroyed(true);
+                .getBucketAdvisor(bucketId).markAllShadowBucketsAsDestroyed();
           }
         }
         bucketAdvisor.getProxyBucketRegion().removeBucket();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index a62eae6..97d64a7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -94,21 +94,18 @@ import org.apache.geode.management.internal.beans.AsyncEventQueueMBean;
 import org.apache.geode.management.internal.beans.GatewaySenderMBean;
 
 public class ParallelGatewaySenderQueue implements RegionQueue {
-
   protected static final Logger logger = LogService.getLogger();
-
-  protected final Map<String, PartitionedRegion> userRegionNameToshadowPRMap =
-      new ConcurrentHashMap<String, PartitionedRegion>();
+  protected final Map<String, PartitionedRegion> userRegionNameToShadowPRMap =
+      new ConcurrentHashMap<>();
+  private static final String SHADOW_BUCKET_PATH_PREFIX =
+      Region.SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + Region.SEPARATOR;
 
   // <PartitionedRegion, Map<Integer, List<Object>>>
   private final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
 
   protected final StoppableReentrantLock buckToDispatchLock;
-
   private final StoppableCondition regionToDispatchedKeysMapEmpty;
-
   protected final StoppableReentrantLock queueEmptyLock;
-
   private volatile boolean isQueueEmpty = true;
 
   /**
@@ -197,7 +194,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       try {
         String regionPath =
             ColocationHelper.getLeaderRegion((PartitionedRegion) event.getRegion()).getFullPath();
-        prQ = userRegionNameToshadowPRMap.get(regionPath);
+        prQ = userRegionNameToShadowPRMap.get(regionPath);
         destroyEventFromQueue(prQ, bucketId, previousTailKeyTobeRemoved);
       } catch (EntryNotFoundException e) {
         if (logger.isDebugEnabled()) {
@@ -313,7 +310,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     try {
       String regionName = userRegion.getFullPath();
 
-      if (this.userRegionNameToshadowPRMap.containsKey(regionName))
+      if (this.userRegionNameToShadowPRMap.containsKey(regionName))
         return;
 
       InternalCache cache = sender.getCache();
@@ -408,7 +405,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       }
     } finally {
       if (prQ != null) {
-        this.userRegionNameToshadowPRMap.put(userRegion.getFullPath(), prQ);
+        this.userRegionNameToShadowPRMap.put(userRegion.getFullPath(), prQ);
       }
       this.sender.getLifeCycleLock().writeLock().unlock();
     }
@@ -436,13 +433,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         // to leader PR)
         // though, internally, colocate the GatewaySender's shadowPR with the leader PR in
         // colocation chain
-        if (!this.userRegionNameToshadowPRMap.containsKey(leaderRegionName)) {
+        if (!this.userRegionNameToShadowPRMap.containsKey(leaderRegionName)) {
           addShadowPartitionedRegionForUserPR(ColocationHelper.getLeaderRegion(userPR));
         }
         return;
       }
 
-      if (this.userRegionNameToshadowPRMap.containsKey(regionName))
+      if (this.userRegionNameToShadowPRMap.containsKey(regionName))
         return;
 
       if (userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()) {
@@ -539,7 +536,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
     } finally {
       if (prQ != null) {
-        this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
+        this.userRegionNameToShadowPRMap.put(userPR.getFullPath(), prQ);
       }
       /*
        * Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is
@@ -663,9 +660,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     if (isDebugEnabled) {
       logger.debug("Put is for the region {}", region);
     }
-    if (!this.userRegionNameToshadowPRMap.containsKey(regionPath)) {
+    if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
       if (isDebugEnabled) {
-        logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToshadowPRMap);
+        logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToShadowPRMap);
       }
       logger.warn(
           "GatewaySender: Not queuing the event {}, as the region for which this event originated is not yet configured in the GatewaySender",
@@ -674,7 +671,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       return false;
     }
 
-    PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(regionPath);
+    PartitionedRegion prQ = this.userRegionNameToShadowPRMap.get(regionPath);
     int bucketId = value.getBucketId();
     Object key = null;
     if (!isDREvent) {
@@ -703,16 +700,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
     try {
       if (brq == null) {
+        // Full path of the bucket
+        final String bucketFullPath = SHADOW_BUCKET_PATH_PREFIX + prQ.getBucketName(bucketId);
+
         // Set the threadInitLevel to BEFORE_INITIAL_IMAGE.
         final InitializationLevel oldLevel =
             LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
-        try {
-          // Full path of the bucket:
-
-          final String bucketFullPath =
-              Region.SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + Region.SEPARATOR
-                  + prQ.getBucketName(bucketId);
 
+        try {
           brq = (AbstractBucketRegionQueue) prQ.getCache().getInternalRegionByPath(bucketFullPath);
           if (isDebugEnabled) {
             logger.debug(
@@ -739,8 +734,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             // chain is getting destroyed one by one starting from child region
             // i.e this bucket due to moveBucket operation
             // In that case we don't want to store this event.
-            if (((PartitionedRegion) prQ.getColocatedWithRegion()).getRegionAdvisor()
-                .getBucketAdvisor(bucketId).getShadowBucketDestroyed()) {
+            if (prQ.getColocatedWithRegion().getRegionAdvisor().getBucketAdvisor(bucketId)
+                .isShadowBucketDestroyed(bucketFullPath)) {
               if (isDebugEnabled) {
                 logger.debug(
                     "ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.",
@@ -790,17 +785,16 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           LocalRegion.setThreadInitLevelRequirement(oldLevel);
         }
       } else {
-        boolean thisbucketDestroyed = false;
+        boolean thisBucketDestroyed = brq.isDestroyed();
 
         if (!isDREvent) {
-          thisbucketDestroyed =
-              ((PartitionedRegion) prQ.getColocatedWithRegion()).getRegionAdvisor()
-                  .getBucketAdvisor(bucketId).getShadowBucketDestroyed() || brq.isDestroyed();
-        } else {
-          thisbucketDestroyed = brq.isDestroyed();
+          // Full path of the bucket
+          final String bucketFullPath = SHADOW_BUCKET_PATH_PREFIX + prQ.getBucketName(bucketId);
+          thisBucketDestroyed |= prQ.getColocatedWithRegion().getRegionAdvisor()
+              .getBucketAdvisor(bucketId).isShadowBucketDestroyed(bucketFullPath);
         }
 
-        if (!thisbucketDestroyed) {
+        if (!thisBucketDestroyed) {
           putIntoBucketRegionQueue(brq, key, value);
           putDone = true;
         } else {
@@ -815,6 +809,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     } finally {
       notifyEventProcessorIfRequired();
     }
+
     return putDone;
   }
 
@@ -874,19 +869,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    */
   @Override
   public Region getRegion() {
-    return this.userRegionNameToshadowPRMap.size() == 1
-        ? (Region) this.userRegionNameToshadowPRMap.values().toArray()[0] : null;
+    return this.userRegionNameToShadowPRMap.size() == 1
+        ? (Region) this.userRegionNameToShadowPRMap.values().toArray()[0] : null;
   }
 
   public PartitionedRegion getRegion(String fullpath) {
-    return this.userRegionNameToshadowPRMap.get(fullpath);
+    return this.userRegionNameToShadowPRMap.get(fullpath);
   }
 
   public PartitionedRegion removeShadowPR(String fullpath) {
     try {
       this.sender.getLifeCycleLock().writeLock().lock();
       this.sender.setEnqueuedAllTempQueueEvents(false);
-      return this.userRegionNameToshadowPRMap.remove(fullpath);
+      return this.userRegionNameToShadowPRMap.remove(fullpath);
     } finally {
       sender.getLifeCycleLock().writeLock().unlock();
     }
@@ -900,15 +895,15 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
    * Returns the set of shadowPR backign this queue.
    */
   public Set<PartitionedRegion> getRegions() {
-    return new HashSet(this.userRegionNameToshadowPRMap.values());
+    return new HashSet(this.userRegionNameToShadowPRMap.values());
   }
 
   // TODO: Find optimal way to get Random shadow pr as this will be called in each put and peek.
   protected PartitionedRegion getRandomShadowPR() {
     PartitionedRegion prQ = null;
-    if (this.userRegionNameToshadowPRMap.values().size() > 0) {
-      int randomIndex = new Random().nextInt(this.userRegionNameToshadowPRMap.size());
-      prQ = (PartitionedRegion) this.userRegionNameToshadowPRMap.values().toArray()[randomIndex];
+    if (this.userRegionNameToShadowPRMap.values().size() > 0) {
+      int randomIndex = new Random().nextInt(this.userRegionNameToShadowPRMap.size());
+      prQ = (PartitionedRegion) this.userRegionNameToShadowPRMap.values().toArray()[randomIndex];
     }
     return prQ;
   }
@@ -930,7 +925,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   protected boolean areLocalBucketQueueRegionsPresent() {
     boolean bucketsAvailable = false;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (prQ.getDataStore().getAllLocalBucketRegions().size() > 0)
         return true;
     }
@@ -994,11 +989,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         Object key = null;
         if (event.getRegion() != null) {
           if (isDREvent(sender.getCache(), event)) {
-            prQ = this.userRegionNameToshadowPRMap.get(event.getRegion().getFullPath());
+            prQ = this.userRegionNameToShadowPRMap.get(event.getRegion().getFullPath());
             bucketId = event.getEventId().getBucketID();
             key = event.getEventId();
           } else {
-            prQ = this.userRegionNameToshadowPRMap.get(ColocationHelper
+            prQ = this.userRegionNameToShadowPRMap.get(ColocationHelper
                 .getLeaderRegion((PartitionedRegion) event.getRegion()).getFullPath());
             bucketId = event.getBucketId();
             key = event.getShadowKey();
@@ -1010,11 +1005,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           if (region != null && !region.isDestroyed()) {
             // TODO: We have to get colocated parent region for this region
             if (region instanceof DistributedRegion) {
-              prQ = this.userRegionNameToshadowPRMap.get(region.getFullPath());
+              prQ = this.userRegionNameToShadowPRMap.get(region.getFullPath());
               event.getBucketId();
               key = event.getEventId();
             } else {
-              prQ = this.userRegionNameToshadowPRMap
+              prQ = this.userRegionNameToShadowPRMap
                   .get(ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath());
               event.getBucketId();
               key = event.getShadowKey();
@@ -1429,7 +1424,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   public String displayContent() {
     int size = 0;
     StringBuffer sb = new StringBuffer();
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (prQ != null && prQ.getDataStore() != null) {
         Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions();
         for (BucketRegion br : allLocalBuckets) {
@@ -1448,7 +1443,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   public int localSize(boolean includeSecondary) {
     int size = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (prQ != null && prQ.getDataStore() != null) {
         if (includeSecondary) {
           size += prQ.getDataStore().getSizeOfLocalBuckets();
@@ -1466,7 +1461,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   public int localSizeForProcessor() {
     int size = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (((PartitionedRegion) prQ.getRegion()).getDataStore() != null) {
         Set<BucketRegion> primaryBuckets =
             ((PartitionedRegion) prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions();
@@ -1487,7 +1482,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   @Override
   public int size() {
     int size = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (logger.isDebugEnabled()) {
         logger.debug("The name of the queue region is {} and the size is {}. keyset size is {}",
             prQ.getName(), prQ.size(), prQ.keys().size());
@@ -1500,7 +1495,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   @Override
   public void addCacheListener(CacheListener listener) {
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       AttributesMutator mutator = prQ.getAttributesMutator();
       mutator.addCacheListener(listener);
     }
@@ -1526,7 +1521,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   public long getNumEntriesOverflowOnDiskTestOnly() {
     long numEntriesOnDisk = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       DiskRegionStats diskStats = prQ.getDiskRegionStats();
       if (diskStats == null) {
         if (logger.isDebugEnabled()) {
@@ -1548,7 +1543,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   public long getNumEntriesInVMTestOnly() {
     long numEntriesInVM = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       DiskRegionStats diskStats = prQ.getDiskRegionStats();
       if (diskStats == null) {
         if (logger.isDebugEnabled()) {
@@ -1835,7 +1830,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   public long estimateMemoryFootprint(SingleObjectSizer sizer) {
     return sizer.sizeof(this) + sizer.sizeof(regionToDispatchedKeysMap)
-        + sizer.sizeof(userRegionNameToshadowPRMap) + sizer.sizeof(bucketToTempQueueMap)
+        + sizer.sizeof(userRegionNameToShadowPRMap) + sizer.sizeof(bucketToTempQueueMap)
         + sizer.sizeof(peekedEvents) + sizer.sizeof(conflationExecutor);
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 2eac9ac..d49430b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -14,8 +14,10 @@
  */
 package org.apache.geode.internal.cache;
 
+import static com.google.common.collect.ImmutableMap.of;
 import static org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
@@ -28,6 +30,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -59,6 +62,7 @@ public class BucketAdvisorTest {
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
     PartitionedRegion mockPartitionedRegion = mock(PartitionedRegion.class);
+    @SuppressWarnings("rawtypes")
     PartitionAttributes mockPartitionAttributes = mock(PartitionAttributes.class);
     DistributionManager mockDistributionManager = mock(DistributionManager.class);
     List<CacheServer> cacheServers = new ArrayList<>();
@@ -80,12 +84,13 @@ public class BucketAdvisorTest {
     assertThat(bucketAdvisor.getBucketServerLocations(0).size()).isEqualTo(0);
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test
   public void whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown() {
     InternalCache mockCache = mock(InternalCache.class);
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
     PartitionedRegion mockPartitionedRegion = mock(PartitionedRegion.class);
+    @SuppressWarnings("rawtypes")
     PartitionAttributes mockPartitionAttributes = mock(PartitionAttributes.class);
     DistributionManager mockDistributionManager = mock(DistributionManager.class);
     List<CacheServer> cacheServers = new ArrayList<>();
@@ -103,7 +108,8 @@ public class BucketAdvisorTest {
     when(mockCacheServer.getExternalAddress()).thenThrow(new IllegalStateException());
 
     BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(mockBucket, mockRegionAdvisor);
-    bucketAdvisor.getBucketServerLocations(0).size();
+    assertThatThrownBy(() -> bucketAdvisor.getBucketServerLocations(0))
+        .isInstanceOf(IllegalStateException.class);
   }
 
   @Test
@@ -150,4 +156,74 @@ public class BucketAdvisorTest {
     advisorSpy.volunteerForPrimary();
     verify(volunteeringDelegate).volunteerForPrimary();
   }
+
+  BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(Map<String, Boolean> shadowBuckets) {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(distributionManager.getId()).thenReturn(new InternalDistributedMember("localhost", 321));
+
+    Bucket bucket = mock(Bucket.class);
+    when(bucket.isHosting()).thenReturn(true);
+    when(bucket.isPrimary()).thenReturn(false);
+    when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+    when(partitionedRegion.getPartitionAttributes()).thenReturn(new PartitionAttributesImpl());
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+    BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, regionAdvisor);
+    bucketAdvisor.destroyedShadowBuckets.putAll(shadowBuckets);
+
+    return bucketAdvisor;
+  }
+
+  @Test
+  public void markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
+    Map<String, Boolean> buckets = of("/b1", false, "/b2", true);
+    BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+    assertThat(bucketAdvisor.destroyedShadowBuckets).isNotEmpty();
+    bucketAdvisor.markAllShadowBucketsAsNonDestroyed();
+    assertThat(bucketAdvisor.destroyedShadowBuckets).isEmpty();
+  }
+
+  @Test
+  public void markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket() {
+    Map<String, Boolean> buckets = of("/b1", false, "/b2", false, "/b3", false);
+    BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+    bucketAdvisor.destroyedShadowBuckets.forEach((k, v) -> assertThat(v).isFalse());
+    bucketAdvisor.markAllShadowBucketsAsDestroyed();
+    bucketAdvisor.destroyedShadowBuckets.forEach((k, v) -> assertThat(v).isTrue());
+  }
+
+  @Test
+  public void markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
+    Map<String, Boolean> buckets = of("/b1", false);
+    BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+    // Known Shadow Bucket
+    assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b1")).isFalse();
+    bucketAdvisor.markShadowBucketAsDestroyed("/b1");
+    assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b1")).isTrue();
+
+    // Unknown Shadow Bucket
+    assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b5")).isNull();
+    bucketAdvisor.markShadowBucketAsDestroyed("/b5");
+    assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b5")).isTrue();
+  }
+
+  @Test
+  public void isShadowBucketDestroyedShouldReturnCorrectly() {
+    Map<String, Boolean> buckets = of("/b1", true, "/b2", false);
+    BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+    // Known Shadow Buckets
+    assertThat(bucketAdvisor.isShadowBucketDestroyed("/b1")).isTrue();
+    assertThat(bucketAdvisor.isShadowBucketDestroyed("/b2")).isFalse();
+
+    // Unknown Shadow Bucket
+    assertThat(bucketAdvisor.isShadowBucketDestroyed("/b5")).isFalse();
+  }
 }
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 1d0ad7b..ff5f0f5 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -15,7 +15,9 @@
 package org.apache.geode.internal.cache.wan.parallel;
 
 import static org.apache.geode.distributed.internal.DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
 import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY;
+import static org.apache.geode.internal.util.ArrayUtils.asList;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -25,14 +27,21 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import org.junit.Before;
 import org.junit.Rule;
@@ -63,6 +72,7 @@ import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 /**
  * DUnit test for operations on ParallelGatewaySender
@@ -72,6 +82,9 @@ import org.apache.geode.test.junit.categories.WanTest;
 public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
 
   @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
 
   @Rule
@@ -643,6 +656,109 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
   }
 
   @Test
+  public void destroyParallelGatewaySenderShouldNotStopDispatchingFromOtherSendersAttachedToTheRegion() {
+    String site2SenderId = "site2-sender";
+    String site3SenderId = "site3-sender";
+    String regionName = testName.getMethodName();
+    int[] ports = getRandomAvailableTCPPortsForDUnitSite(3);
+    int site1Port = ports[0];
+    int site2Port = ports[1];
+    int site3Port = ports[2];
+    Set<String> site1RemoteLocators =
+        Stream.of("localhost[" + site2Port + "]", "localhost[" + site3Port + "]")
+            .collect(Collectors.toSet());
+    Set<String> site2RemoteLocators =
+        Stream.of("localhost[" + site1Port + "]", "localhost[" + site3Port + "]")
+            .collect(Collectors.toSet());
+    Set<String> site3RemoteLocators =
+        Stream.of("localhost[" + site1Port + "]", "localhost[" + site2Port + "]")
+            .collect(Collectors.toSet());
+
+    // Start 3 sites.
+    vm0.invoke(() -> createLocator(1, site1Port,
+        Collections.singleton("localhost[" + site1Port + "]"), site1RemoteLocators));
+    vm1.invoke(() -> createLocator(2, site2Port,
+        Collections.singleton("localhost[" + site2Port + "]"), site2RemoteLocators));
+    vm2.invoke(() -> createLocator(3, site3Port,
+        Collections.singleton("localhost[" + site3Port + "]"), site3RemoteLocators));
+
+    // Create the cache on the 3 sites.
+    createCacheInVMs(site1Port, vm3);
+    createCacheInVMs(site2Port, vm4);
+    createCacheInVMs(site3Port, vm5);
+
+    // Create receiver and region on sites 2 and 3.
+    asList(vm4, vm5).forEach(vm -> vm.invoke(() -> {
+      createReceiver();
+      createPartitionedRegion(regionName, null, 1, 113, isOffHeap());
+    }));
+
+    // Create senders and partitioned region on site 1.
+    vm3.invoke(() -> {
+      createSender(site2SenderId, 2, true, 100, 20, false, false, null, false);
+      createSender(site3SenderId, 3, true, 100, 20, false, false, null, false);
+      waitForSenderRunningState(site2SenderId);
+      waitForSenderRunningState(site3SenderId);
+
+      createPartitionedRegion(regionName, String.join(",", site2SenderId, site3SenderId), 1, 113,
+          isOffHeap());
+    });
+
+    // #################################################################################### //
+
+    final int FIRST_BATCH = 100;
+    final int SECOND_BATCH = 200;
+    final Map<String, String> firstBatch = new HashMap<>();
+    IntStream.range(0, FIRST_BATCH).forEach(i -> firstBatch.put("Key" + i, "Value" + i));
+    final Map<String, String> secondBatch = new HashMap<>();
+    IntStream.range(FIRST_BATCH, SECOND_BATCH)
+        .forEach(i -> secondBatch.put("Key" + i, "Value" + i));
+
+    // Insert first batch and wait until the queues are empty.
+    vm3.invoke(() -> {
+      cache.getRegion(regionName).putAll(firstBatch);
+      checkQueueSize(site2SenderId, 0);
+      checkQueueSize(site3SenderId, 0);
+    });
+
+    // Wait until sites 2 and 3 have received all updates.
+    asList(vm4, vm5).forEach(vm -> vm.invoke(() -> {
+      Region<String, String> region = cache.getRegion(regionName);
+      await().untilAsserted(() -> assertThat(region.size()).isEqualTo(FIRST_BATCH));
+      firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value));
+    }));
+
+    // Stop sender to site3, remove it from the region and destroy it.
+    vm3.invoke(() -> {
+      stopSender(site3SenderId);
+      removeSenderFromTheRegion(site3SenderId, regionName);
+      destroySender(site3SenderId);
+      verifySenderDestroyed(site3SenderId, true);
+    });
+
+    // Insert second batch and wait until the queue is empty.
+    vm3.invoke(() -> {
+      cache.getRegion(regionName).putAll(secondBatch);
+      checkQueueSize(site2SenderId, 0);
+    });
+
+    // Site 3 should only have the first batch.
+    vm5.invoke(() -> {
+      Region<String, String> region = cache.getRegion(regionName);
+      await().untilAsserted(() -> assertThat(region.size()).isEqualTo(FIRST_BATCH));
+      firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value));
+    });
+
+    // Site 2 should have both batches.
+    vm4.invoke(() -> {
+      Region<String, String> region = cache.getRegion(regionName);
+      await().untilAsserted(() -> assertThat(region.size()).isEqualTo(SECOND_BATCH));
+      firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value));
+      secondBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value));
+    });
+  }
+
+  @Test
   public void testParallelGatewaySenderMessageTooLargeException() {
     vm4.invoke(() -> System.setProperty(MAX_MESSAGE_SIZE_PROPERTY, String.valueOf(1024 * 1024)));