You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2022/06/29 20:38:34 UTC

[geode] branch develop updated: GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage (#7323)

This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new ef7dc45dd2 GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage (#7323)
ef7dc45dd2 is described below

commit ef7dc45dd24a6241fa748917205aca858f5c1c1b
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Wed Jun 29 22:38:28 2022 +0200

    GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage (#7323)
    
    * GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage to signal duplicate events on secondary buckets
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |   4 +
 .../internal/ParallelAsyncEventQueueImpl.java      |   3 +
 .../internal/SerialAsyncEventQueueImpl.java        |   3 +
 .../org/apache/geode/internal/DSFIDFactory.java    |   3 +
 .../internal/cache/AbstractBucketRegionQueue.java  |  63 ++++-
 .../apache/geode/internal/cache/BucketAdvisor.java |  13 ++
 .../apache/geode/internal/cache/BucketRegion.java  |  10 +
 .../geode/internal/cache/BucketRegionQueue.java    |  38 +++
 .../geode/internal/cache/GemFireCacheImpl.java     |  14 +-
 .../geode/internal/cache/PartitionedRegion.java    |  10 +
 .../sockets/command/GatewayReceiverCommand.java    |   4 +
 .../internal/cache/wan/AbstractGatewaySender.java  |  24 ++
 .../wan/AbstractGatewaySenderEventProcessor.java   | 201 ++++++++++++++--
 .../internal/cache/wan/GatewayReceiverStats.java   |  31 ++-
 .../internal/cache/wan/InternalGatewaySender.java  |   4 +
 .../wan/parallel/ParallelGatewaySenderQueue.java   |   7 +
 .../ParallelQueueSetPossibleDuplicateMessage.java  | 166 ++++++++++++++
 .../xmlcache/ParallelAsyncEventQueueCreation.java  |   3 +
 .../xmlcache/ParallelGatewaySenderCreation.java    |   3 +
 .../xmlcache/SerialAsyncEventQueueCreation.java    |   3 +
 .../xmlcache/SerialGatewaySenderCreation.java      |   3 +
 .../ParallelGatewaySenderQueueJUnitTest.java       |   6 +
 ...lQueueSetPossibleDuplicateMessageJUnitTest.java | 243 ++++++++++++++++++++
 .../serialization/DataSerializableFixedID.java     |   1 +
 .../geode/internal/cache/wan/WANTestBase.java      |   1 +
 ...tewaySenderCheckPossibleDuplicateDUnitTest.java | 255 +++++++++++++++++++++
 .../parallel/ParallelGatewaySenderImpl.java        |  11 +
 .../internal/serial/SerialGatewaySenderImpl.java   |   3 +
 28 files changed, 1108 insertions(+), 22 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 84309aac4f..c75044e381 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1925,6 +1925,10 @@ org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage,2
 fromData,15
 toData,15
 
+org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage,2
+fromData,26
+toData,26
+
 org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation$DestroyMessage,2
 fromData,46
 toData,41
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 4afb51d872..d5395df9e9 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -114,6 +114,9 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
     }
   }
 
+  @Override
+  public void prepareForStop() {}
+
   @Override
   public void stop() {
     getLifeCycleLock().writeLock().lock();
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 1713feff76..06e6e594e2 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -135,6 +135,9 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
     return eventProcessor;
   }
 
+  @Override
+  public void prepareForStop() {}
+
   @Override
   public void stop() {
     if (logger.isDebugEnabled()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index d11a9435b9..e6e3cb96cf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -400,6 +400,7 @@ import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationOperation;
 import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage;
+import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage;
 import org.apache.geode.internal.cache.wan.serial.BatchDestroyOperation;
 import org.apache.geode.internal.serialization.DSFIDSerializer;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
@@ -985,6 +986,8 @@ public class DSFIDFactory implements DataSerializableFixedID {
     serializer.register(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
     serializer.register(HOST_AND_PORT, HostAndPort.class);
     serializer.register(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class);
+    serializer.register(PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE,
+        ParallelQueueSetPossibleDuplicateMessage.class);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 03339a3667..3d9272b71d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -14,8 +14,11 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.LOAD_FROM_TEMP_QUEUE;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,14 +34,20 @@ import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage;
 import org.apache.geode.internal.offheap.OffHeapClearRequired;
+import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.util.internal.GeodeGlossary;
@@ -218,11 +227,12 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     if (queues != null) {
       ConcurrentParallelGatewaySenderQueue prq =
           (ConcurrentParallelGatewaySenderQueue) queues.toArray()[0];
-      // synchronized (prq.getBucketToTempQueueMap()) {
+
       BlockingQueue<GatewaySenderEventImpl> tempQueue = prq.getBucketTmpQueue(getId());
-      // .getBucketToTempQueueMap().get(getId());
       if (tempQueue != null && !tempQueue.isEmpty()) {
         synchronized (tempQueue) {
+          Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap =
+              new HashMap<>();
           try {
             // ParallelQueueRemovalMessage checks for the key in BucketRegionQueue
             // and if not found there, it removes it from tempQueue. When tempQueue
@@ -235,6 +245,9 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
               try {
                 event.setPossibleDuplicate(true);
                 if (addToQueue(event.getShadowKey(), event)) {
+                  if (notifyDuplicateSupported()) {
+                    addDuplicateEvent(regionToDuplicateEventsMap, event);
+                  }
                   event = null;
                 }
               } catch (ForceReattemptException e) {
@@ -257,13 +270,57 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
             }
             getInitializationLock().writeLock().unlock();
           }
+          notifyDuplicateEvents(regionToDuplicateEventsMap);
         }
       }
+    }
+  }
+
+  private boolean notifyDuplicateSupported() {
+    return !(this.getPartitionedRegion().getParallelGatewaySender().getEventProcessor()
+        .getDispatcher() instanceof GatewaySenderEventCallbackDispatcher);
+  }
+
+  private void notifyDuplicateEvents(
+      Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap) {
+    if (regionToDuplicateEventsMap.isEmpty()) {
+      return;
+    }
+    if (getPartitionedRegion().getRegionAdvisor() == null) {
+      return;
+    }
+
+    Set<InternalDistributedMember> recipients =
+        getPartitionedRegion().getRegionAdvisor().adviseDataStore();
+
+    if (recipients.isEmpty()) {
+      return;
+    }
 
-      // }
+    InternalDistributedSystem ids = getCache().getInternalDistributedSystem();
+    DistributionManager dm = ids.getDistributionManager();
+    dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0);
+
+    if (!recipients.isEmpty()) {
+      ParallelQueueSetPossibleDuplicateMessage possibleDuplicateMessage =
+          new ParallelQueueSetPossibleDuplicateMessage(LOAD_FROM_TEMP_QUEUE,
+              regionToDuplicateEventsMap);
+      possibleDuplicateMessage.setRecipients(recipients);
+      dm.putOutgoing(possibleDuplicateMessage);
     }
   }
 
+  private void addDuplicateEvent(Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap,
+      GatewaySenderEventImpl event) {
+    Map<Integer, List<Object>> bucketIdToDispatchedKeys = regionToDuplicateEventsMap
+        .computeIfAbsent(getPartitionedRegion().getFullPath(), k -> new HashMap<>());
+
+    List<Object> dispatchedKeys =
+        bucketIdToDispatchedKeys.computeIfAbsent(getId(), k -> new ArrayList<>());
+
+    dispatchedKeys.add(event.getShadowKey());
+  }
+
   @Override
   public void forceSerialized(EntryEventImpl event) {
     // NOOP since we want the value in the region queue to stay in object form.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index e6dcd3fb8f..5c3e98dfbf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -164,6 +164,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
 
   private BucketAdvisor startingBucketAdvisor;
 
+  private volatile boolean hasBecomePrimary = false;
+
   private final PartitionedRegion pRegion;
 
   final ConcurrentMap<String, Boolean> destroyedShadowBuckets = new ConcurrentHashMap<>();
@@ -498,6 +500,13 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
   }
 
 
+  BucketAdvisor getParentAdvisor() {
+    return parentAdvisor;
+  }
+
+  boolean getHasBecomePrimary() {
+    return hasBecomePrimary;
+  }
 
   /**
    * Called by the RegionAdvisor.profileRemoved, this method tests to see if the missing member is
@@ -1153,6 +1162,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
     try {
       synchronized (this) {
         if (isHosting() && (isVolunteering() || isBecomingPrimary())) {
+          hasBecomePrimary = isBecomingPrimary();
           Bucket br = regionAdvisor.getBucket(getBucket().getId());
           if (br instanceof BucketRegion) {
             ((BucketRegion) br).beforeAcquiringPrimaryState();
@@ -1167,6 +1177,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
             if (hasPrimary() && isPrimary()) {
               shouldInvokeListeners = true;
             }
+          } else {
+            hasBecomePrimary = false;
           }
         }
       }
@@ -2159,6 +2171,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
   private void changeFromPrimaryTo(byte requestedState) {
     try {
       primaryState = requestedState;
+      hasBecomePrimary = false;
     } finally {
       getPartitionedRegionStats().incPrimaryBucketCount(-1);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index ffb1109115..8991441a5a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -217,6 +217,8 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     }
   }
 
+  private boolean receivedGatewaySenderStoppedMessage = false;
+
   private final int redundancy;
 
   /** the partitioned region to which this bucket belongs */
@@ -2535,4 +2537,12 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     return getSystem().getDistributionManager().getOtherDistributionManagerIds();
   }
 
+  public boolean isReceivedGatewaySenderStoppedMessage() {
+    return receivedGatewaySenderStoppedMessage;
+  }
+
+  public void setReceivedGatewaySenderStoppedMessage(boolean notified) {
+    receivedGatewaySenderStoppedMessage = notified;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index aba62d4d2a..8fb572c236 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -43,6 +43,7 @@ import org.apache.geode.internal.cache.persistence.query.mock.ByteComparator;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.parallel.BucketRegionQueueUnavailableException;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
@@ -202,6 +203,23 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
 
   @Override
   public void beforeAcquiringPrimaryState() {
+    PartitionedRegion region = getPartitionedRegion();
+
+    if (region != null && region.getParallelGatewaySender() != null) {
+      AbstractGatewaySenderEventProcessor ep =
+          region.getParallelGatewaySender().getEventProcessor();
+
+      if (ep != null && !(ep.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
+        if (isReceivedGatewaySenderStoppedMessage()) {
+          setReceivedGatewaySenderStoppedMessage(false);
+          return;
+        }
+        BucketAdvisor parent = getParentAdvisor(getBucketAdvisor());
+        if (parent.getHasBecomePrimary()) {
+          return;
+        }
+      }
+    }
     markAsDuplicate.addAll(eventSeqNumDeque);
   }
 
@@ -660,4 +678,24 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     }
   }
 
+  public void setAsPossibleDuplicate(Object key) {
+    Object object = optimalGet(key);
+    if (object != null) {
+      ((GatewaySenderEventImpl) object).setPossibleDuplicate(true);
+    }
+  }
+
+  public boolean checkIfQueueContainsKey(Object key) {
+    return eventSeqNumDeque.contains(key);
+  }
+
+  BucketAdvisor getParentAdvisor(BucketAdvisor advisor) {
+    BucketAdvisor parent = advisor.getParentAdvisor();
+    while (parent != null) {
+      advisor = parent;
+      parent = advisor.getParentAdvisor();
+    }
+    return advisor;
+
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index e385581fbf..71d30adbf6 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -233,6 +233,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
 import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationListener;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
 import org.apache.geode.internal.cache.wan.WANServiceProvider;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.xmlcache.CacheServerCreation;
@@ -2182,9 +2183,20 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
         return false;
       }
 
+      boolean isDebugEnabled = logger.isDebugEnabled();
+
+      for (GatewaySender sender : allGatewaySenders) {
+        try {
+          ((InternalGatewaySender) sender).prepareForStop();
+        } catch (Exception exception) {
+          if (isDebugEnabled) {
+            logger.debug("When calling Prepare for stop gw sender, ignore exception " + exception);
+          }
+        }
+      }
+
       CLOSING_THREAD.set(Thread.currentThread());
       try {
-        boolean isDebugEnabled = logger.isDebugEnabled();
 
         // First close the ManagementService
         system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 03dd3de027..896af32576 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -771,6 +771,8 @@ public class PartitionedRegion extends LocalRegion
 
   private boolean regionCreationNotified;
 
+  private boolean sentGatewaySenderStoppedMessage = false;
+
   public interface RegionAdvisorFactory {
     RegionAdvisor create(PartitionedRegion region);
   }
@@ -10170,4 +10172,12 @@ public class PartitionedRegion extends LocalRegion
   public boolean areRecoveriesInProgress() {
     return prStats.getRecoveriesInProgress() > 0;
   }
+
+  public boolean isSentGatewaySenderStoppedMessage() {
+    return sentGatewaySenderStoppedMessage;
+  }
+
+  public void setSentGatewaySenderStoppedMessage(boolean notified) {
+    sentGatewaySenderStoppedMessage = notified;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 14e3ccf061..f133bc51c9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -186,6 +186,10 @@ public class GatewayReceiverCommand extends BaseCommand {
           }
           boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
 
+          if (possibleDuplicate) {
+            stats.incPossibleDuplicateEventsReceived();
+          }
+
           // Retrieve the region name from the message parts
           Part regionNamePart = clientMessage.getPart(partNumber + 2);
           String regionName = regionNamePart.getCachedString();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index e5fa44a83e..bd1a898da4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -585,6 +585,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
   @Override
   public abstract void startWithCleanQueue();
 
+  @Override
+  public abstract void prepareForStop();
+
   @Override
   public abstract void stop();
 
@@ -1298,6 +1301,27 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
     }
   }
 
+  public boolean markAsDuplicateInTempQueueEvents(Object tailKey) {
+    synchronized (queuedEventsSync) {
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+
+      for (TmpQueueEvent event : tmpQueuedEvents) {
+        if (tailKey.equals(event.getEvent().getTailKey())) {
+          if (isDebugEnabled) {
+            logger.debug(
+                "shadowKey {} is found in tmpQueueEvents at AbstractGatewaySender level. Marking it..",
+                tailKey);
+          }
+          event.getEvent().setPossibleDuplicate(true);
+          return true;
+        }
+      }
+
+      return false;
+    }
+  }
+
+
   /**
    * During sender is getting stopped, if there are any cache operation on queue then that event
    * will be stored in temp queue. Once sender is started, these event from tmp queue will be
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index fbdc1d9c2c..c7413aa91f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -18,6 +18,9 @@ import static java.lang.Boolean.TRUE;
 import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE;
 import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE;
 import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.RESET_BATCH;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.STOPPED_GATEWAY_SENDER;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.UNSUCCESSFULLY_DISPATCHED;
 import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.io.IOException;
@@ -29,10 +32,12 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -46,7 +51,11 @@ import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.wan.GatewayEventFilter;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.ColocationHelper;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
@@ -58,8 +67,10 @@ import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage;
 import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.logging.internal.executors.LoggingThread;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.pdx.internal.PeerTypeRegistration;
@@ -469,7 +480,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
           boolean interrupted = Thread.interrupted();
           try {
             if (resetLastPeekedEvents) {
-              pendingEventsInBatchesMarkAsPossibleDuplicate();
+              notifyPossibleDuplicate(RESET_BATCH, pendingEventsInBatches());
               resetLastPeekedEvents();
               resetLastPeekedEvents = false;
             }
@@ -967,16 +978,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
   private void handleUnSuccessfulBatchDispatch(List<?> events) {
     final GatewaySenderStats statistics = sender.getStatistics();
     statistics.incBatchesRedistributed();
-
     // Set posDup flag on each event in the batch
-    Iterator<?> it = events.iterator();
-    while (it.hasNext() && !isStopped) {
-      Object o = it.next();
-      if (o instanceof GatewaySenderEventImpl) {
-        GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
-        ge.setPossibleDuplicate(true);
-      }
-    }
+    notifyPossibleDuplicate(UNSUCCESSFULLY_DISPATCHED, events);
   }
 
   /**
@@ -1217,7 +1220,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
         logger.warn("Destroying GatewayEventDispatcher with actively queued data.");
       }
       if (resetLastPeekedEvents) {
-        pendingEventsInBatchesMarkAsPossibleDuplicate();
+        notifyPossibleDuplicate(STOPPED_GATEWAY_SENDER, pendingEventsInBatches());
         resetLastPeekedEvents();
         resetLastPeekedEvents = false;
       }
@@ -1322,17 +1325,181 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
 
   protected abstract void enqueueEvent(GatewayQueueEvent<?, ?> event);
 
-  private void pendingEventsInBatchesMarkAsPossibleDuplicate() {
+  private void notifyPossibleDuplicate(int reason, List<?> events) {
+    Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap = new HashMap<>();
+    boolean pgwsender = (getSender().isParallel()
+        && !(getDispatcher() instanceof GatewaySenderEventCallbackDispatcher));
+
+    for (Object o : events) {
+      if (o instanceof GatewaySenderEventImpl) {
+        GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
+        if (!ge.getPossibleDuplicate()) {
+          if (pgwsender) {
+            addDuplicateEvent(regionToDispatchedKeysMap, ge);
+          }
+          ge.setPossibleDuplicate(true);
+        }
+      }
+    }
+
+    if (!pgwsender) {
+      return;
+    }
+
+    PartitionedRegion queueRegion;
+    if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      queueRegion =
+          (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) queue).getRegion();
+    } else {
+      queueRegion =
+          (PartitionedRegion) ((ParallelGatewaySenderQueue) queue).getRegion();
+    }
+
+    if (queueRegion == null || queueRegion.getRegionAdvisor() == null
+        || queueRegion.getDataStore() == null) {
+      return;
+    }
+
+    if (reason == STOPPED_GATEWAY_SENDER) {
+      final Set<Integer> buckets = queueRegion.getDataStore().getAllLocalPrimaryBucketIds();
+      if (regionToDispatchedKeysMap.isEmpty()) {
+        if (queueRegion.isSentGatewaySenderStoppedMessage()) {
+          return;
+        }
+        Map<Integer, List<Object>> bucketIdToDispatchedKeys = new HashMap<>();
+        for (Integer bId : buckets) {
+          bucketIdToDispatchedKeys.put(bId, Collections.emptyList());
+        }
+        regionToDispatchedKeysMap.put(queueRegion.getFullPath(), bucketIdToDispatchedKeys);
+
+      } else {
+        Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+            regionToDispatchedKeysMap.get(queueRegion.getFullPath());
+        if (bucketIdToDispatchedKeys == null) {
+          return;
+        }
+        for (Integer bId : buckets) {
+          bucketIdToDispatchedKeys.putIfAbsent(bId, Collections.emptyList());
+        }
+      }
+    }
+
+    if (regionToDispatchedKeysMap.size() > 0) {
+      Set<InternalDistributedMember> recipients =
+          getAllRecipients(sender.getCache(), regionToDispatchedKeysMap);
+
+      if (recipients.isEmpty()) {
+        return;
+      }
+
+      if (reason == STOPPED_GATEWAY_SENDER) {
+        if (!queueRegion.isSentGatewaySenderStoppedMessage()) {
+          queueRegion.setSentGatewaySenderStoppedMessage(true);
+        }
+      }
+
+      InternalDistributedSystem ids = sender.getCache().getInternalDistributedSystem();
+      DistributionManager dm = ids.getDistributionManager();
+      dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0);
+
+      if (!recipients.isEmpty()) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "notifyPossibleDuplicate send ParallelQueueSetPossibleDuplicateMessage recipients {}.",
+              recipients);
+        }
+
+        ParallelQueueSetPossibleDuplicateMessage pqspdm =
+            new ParallelQueueSetPossibleDuplicateMessage(reason, regionToDispatchedKeysMap);
+        pqspdm.setRecipients(recipients);
+        dm.putOutgoing(pqspdm);
+      }
+    }
+
+  }
+
+  protected void addDuplicateEvent(
+      Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap,
+      GatewaySenderEventImpl event) {
+    PartitionedRegion prQ = null;
+    int bucketId = -1;
+    Object key = null;
+    InternalCache cache = sender.getCache();
+    String regionPath = event.getRegionPath();
+
+    if (event.getRegion() != null) {
+      if (cache.getRegion(regionPath) instanceof DistributedRegion) {
+        prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(event.getRegion().getFullPath());
+        bucketId = event.getEventId().getBucketID();
+        key = event.getEventId();
+      } else {
+        prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(ColocationHelper
+            .getLeaderRegion((PartitionedRegion) event.getRegion()).getFullPath());
+        bucketId = event.getBucketId();
+        key = event.getShadowKey();
+      }
+    } else {
+      Region region = (PartitionedRegion) cache.getRegion(regionPath);
+      if (region != null && !region.isDestroyed()) {
+        if (region instanceof DistributedRegion) {
+          prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(region.getFullPath());
+          bucketId = event.getBucketId();
+          key = event.getEventId();
+        } else {
+          prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(
+              ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath());
+          bucketId = event.getBucketId();
+          key = event.getShadowKey();
+        }
+      }
+    }
+
+    if (prQ == null) {
+      return;
+    }
+
+    Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+        regionToDispatchedKeysMap.get(prQ.getFullPath());
+    if (bucketIdToDispatchedKeys == null) {
+      bucketIdToDispatchedKeys = new ConcurrentHashMap<>();
+      regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
+    }
+
+    List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId);
+    if (dispatchedKeys == null) {
+      dispatchedKeys = new ArrayList<>();
+      bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys);
+    }
+    dispatchedKeys.add(key);
+
+  }
+
+  public void prepareForStopProcessing() {
+    notifyPossibleDuplicate(STOPPED_GATEWAY_SENDER, pendingEventsInBatches());
+  }
+
+  private Set<InternalDistributedMember> getAllRecipients(InternalCache cache,
+      Map<String, Map<Integer, List<Object>>> map) {
+    Set<InternalDistributedMember> recipients = new ObjectOpenHashSet<>();
+    for (Object pr : map.keySet()) {
+      PartitionedRegion partitionedRegion = (PartitionedRegion) cache.getRegion((String) pr);
+      if (partitionedRegion != null && partitionedRegion.getRegionAdvisor() != null) {
+        recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore());
+      }
+    }
+    return recipients;
+  }
+
+
+  private List<GatewaySenderEventImpl> pendingEventsInBatches() {
+    List<GatewaySenderEventImpl> pendingEvents = new ArrayList<>();
     if (!batchIdToEventsMap.isEmpty()) {
       for (Map.Entry<Integer, List<GatewaySenderEventImpl>[]> entry : batchIdToEventsMap
           .entrySet()) {
-        for (GatewaySenderEventImpl event : entry.getValue()[0]) {
-          if (!event.getPossibleDuplicate()) {
-            event.setPossibleDuplicate(true);
-          }
-        }
+        pendingEvents.addAll(entry.getValue()[0]);
       }
     }
+    return pendingEvents;
   }
 
   protected static class SenderStopperCallable implements Callable<Boolean> {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
index 73af7b0b7b..e5764e4151 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
@@ -32,6 +32,12 @@ public class GatewayReceiverStats extends CacheServerStats {
    */
   private static final String DUPLICATE_BATCHES_RECEIVED = "duplicateBatchesReceived";
 
+  /**
+   * Name of the number of events with possible duplicate indication received statistic
+   */
+  private static final String POSSIBLE_DUPLICATE_EVENTS_RECEIVED =
+      "possibleduplicateEventsReceived";
+
   /**
    * Name of the event queue time statistic
    */
@@ -86,6 +92,11 @@ public class GatewayReceiverStats extends CacheServerStats {
    */
   private final int duplicateBatchesReceivedId;
 
+  /**
+   * Id of the number of events with possible duplicate indication received statistic
+   */
+  private final int possibleduplicateEventsReceivedId;
+
   /**
    * Id of the event queue time statistic
    */
@@ -159,7 +170,11 @@ public class GatewayReceiverStats extends CacheServerStats {
         f.createLongCounter(EXCEPTIONS_OCCURRED,
             "number of exceptions occurred while porcessing the batches", "operations"),
         f.createLongCounter(EVENTS_RETRIED,
-            "total number events retried by this GatewayReceiver due to exceptions", "operations")};
+            "total number events retried by this GatewayReceiver due to exceptions", "operations"),
+        f.createLongCounter(POSSIBLE_DUPLICATE_EVENTS_RECEIVED,
+            "total number of possible duplicate events received by this GatewayReceiver",
+            "operations")
+    };
     return new GatewayReceiverStats(f, ownerName, typeName, descriptors, meterRegistry);
 
   }
@@ -178,6 +193,7 @@ public class GatewayReceiverStats extends CacheServerStats {
     unknowsOperationsReceivedId = statType.nameToId(UNKNOWN_OPERATIONS_RECEIVED);
     exceptionsOccurredId = statType.nameToId(EXCEPTIONS_OCCURRED);
     eventsRetriedId = statType.nameToId(EVENTS_RETRIED);
+    possibleduplicateEventsReceivedId = statType.nameToId(POSSIBLE_DUPLICATE_EVENTS_RECEIVED);
 
     this.meterRegistry = meterRegistry;
     eventsReceivedCounter = LegacyStatCounter.builder(EVENTS_RECEIVED_COUNTER_NAME)
@@ -198,6 +214,19 @@ public class GatewayReceiverStats extends CacheServerStats {
     return stats.getLong(duplicateBatchesReceivedId);
   }
 
+
+  /**
+   * Increments the number of duplicate events received by 1.
+   */
+  public void incPossibleDuplicateEventsReceived() {
+    stats.incLong(possibleduplicateEventsReceivedId, 1);
+  }
+
+  public long getPossibleDuplicateEventsReceived() {
+    return stats.getLong(possibleduplicateEventsReceivedId);
+  }
+
+
   /**
    * Increments the number of out of order batches received by 1.
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
index 13e36e7f9b..c4f48b7dd6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
@@ -49,4 +49,8 @@ public interface InternalGatewaySender extends GatewaySender {
   void setStartEventProcessorInPausedState();
 
   int getEventQueueSize();
+
+
+  void prepareForStop();
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 678286c92b..c37b4aefc8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -593,6 +593,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     } finally {
       if (prQ != null) {
         userRegionNameToShadowPRMap.put(userPR.getFullPath(), prQ);
+        prQ.setSentGatewaySenderStoppedMessage(false);
+        if (prQ.getDataStore() != null) {
+          final Set<BucketRegion> buckets = prQ.getDataStore().getAllLocalBucketRegions();
+          for (BucketRegion br : buckets) {
+            br.setReceivedGatewaySenderStoppedMessage(false);
+          }
+        }
       }
       /*
        * Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java
new file mode 100644
index 0000000000..d1c2de3166
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.PooledDistributionMessage;
+import org.apache.geode.internal.cache.BucketRegionQueue;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Sets events in the remote secondary queues to possible duplicate
+ *
+ * @since Geode 1.15
+ */
+public class ParallelQueueSetPossibleDuplicateMessage extends PooledDistributionMessage {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private int reason;
+  private Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap;
+
+  public static final int UNSUCCESSFULLY_DISPATCHED = 0;
+  public static final int RESET_BATCH = 1;
+  public static final int LOAD_FROM_TEMP_QUEUE = 2;
+  public static final int STOPPED_GATEWAY_SENDER = 3;
+
+
+  public ParallelQueueSetPossibleDuplicateMessage() {}
+
+  public ParallelQueueSetPossibleDuplicateMessage(int reason,
+      Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap) {
+    this.reason = reason;
+    this.regionToDuplicateEventsMap = regionToDuplicateEventsMap;
+  }
+
+  @Override
+  public int getDSFID() {
+    return PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE;
+  }
+
+  @Override
+  public String toString() {
+    String cname = getShortClassName();
+    final StringBuilder sb = new StringBuilder(cname);
+    sb.append("reason=").append(reason);
+    sb.append(" regionToDispatchedKeysMap=").append(regionToDuplicateEventsMap);
+    sb.append(" sender=").append(getSender());
+    return sb.toString();
+  }
+
+  @Override
+  protected void process(ClusterDistributionManager dm) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    final InternalCache cache = dm.getCache();
+
+    if (cache == null) {
+      return;
+    }
+    final InitializationLevel oldLevel =
+        LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
+    try {
+      for (String name : regionToDuplicateEventsMap.keySet()) {
+        final PartitionedRegion region = (PartitionedRegion) cache.getRegion(name);
+        if (region == null) {
+          continue;
+        }
+
+        AbstractGatewaySender abstractSender = region.getParallelGatewaySender();
+        // Find the map: bucketId to dispatchedKeys
+        // Find the bucket
+        // Destroy the keys
+        Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+            this.regionToDuplicateEventsMap.get(name);
+        for (Integer bId : bucketIdToDispatchedKeys.keySet()) {
+          final String bucketFullPath =
+              SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR
+                  + region.getBucketName(bId);
+          BucketRegionQueue brq =
+              (BucketRegionQueue) cache.getInternalRegionByPath(bucketFullPath);
+
+          if (brq != null && reason == STOPPED_GATEWAY_SENDER) {
+            brq.setReceivedGatewaySenderStoppedMessage(true);
+          }
+
+          if (isDebugEnabled) {
+            logger.debug(
+                "ParallelQueueSetPossibleDuplicateMessage : The bucket in the cache is bucketRegionName : {} bucket: {}",
+                bucketFullPath, brq);
+          }
+
+          List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bId);
+          if (dispatchedKeys != null && !dispatchedKeys.isEmpty()) {
+            for (Object key : dispatchedKeys) {
+              // First, clear the Event from tempQueueEvents at AbstractGatewaySender level, if
+              // exists
+              // synchronize on AbstractGatewaySender.queuedEventsSync while doing so
+              abstractSender.markAsDuplicateInTempQueueEvents(key);
+
+              if (brq != null) {
+                if (isDebugEnabled) {
+                  logger.debug(
+                      "ParallelQueueSetPossibleDuplicateMessage : The bucket {} key {}.",
+                      brq, key);
+                }
+
+                if (brq.checkIfQueueContainsKey(key)) {
+                  brq.setAsPossibleDuplicate(key);
+                }
+              }
+            }
+          }
+        }
+      } //
+    } finally {
+      LocalRegion.setThreadInitLevelRequirement(oldLevel);
+    }
+  }
+
+  @Override
+  public void toData(DataOutput out,
+      SerializationContext context) throws IOException {
+    super.toData(out, context);
+    DataSerializer.writeInteger(this.reason, out);
+    DataSerializer.writeHashMap(this.regionToDuplicateEventsMap, out);
+  }
+
+  @Override
+  public void fromData(DataInput in,
+      DeserializationContext context) throws IOException, ClassNotFoundException {
+    super.fromData(in, context);
+    this.reason = DataSerializer.readInteger(in);
+    this.regionToDuplicateEventsMap = DataSerializer.readHashMap(in);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
index b299925fbd..b9433dcbc6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
@@ -48,6 +48,9 @@ public class ParallelAsyncEventQueueCreation extends AbstractGatewaySender
   @Override
   public void startWithCleanQueue() {}
 
+  @Override
+  public void prepareForStop() {}
+
   @Override
   public void stop() {}
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
index 82adbee5dd..561f0b4634 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
@@ -48,6 +48,9 @@ public class ParallelGatewaySenderCreation extends AbstractGatewaySender impleme
   @Override
   public void startWithCleanQueue() {}
 
+  @Override
+  public void prepareForStop() {}
+
   @Override
   public void stop() {}
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
index 4b44799e06..834f6ad474 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
@@ -47,6 +47,9 @@ public class SerialAsyncEventQueueCreation extends AbstractGatewaySender impleme
   @Override
   public void startWithCleanQueue() {}
 
+  @Override
+  public void prepareForStop() {}
+
   @Override
   public void stop() {}
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
index b0f02ceb91..a239fd9972 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
@@ -48,6 +48,9 @@ public class SerialGatewaySenderCreation extends AbstractGatewaySender implement
   @Override
   public void startWithCleanQueue() {}
 
+  @Override
+  public void prepareForStop() {}
+
   @Override
   public void stop() {}
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index 92d1601d8b..7e8000f0d1 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -556,10 +556,13 @@ public class ParallelGatewaySenderQueueJUnitTest {
     targetRs.add(region);
 
     PartitionedRegion shadowRegion = mock(PartitionedRegion.class);
+    PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
 
     when(regionFactory.create(any())).thenReturn(shadowRegion);
 
     when(shadowRegion.getFullPath()).thenReturn("_PARALLEL_GATEWAY_SENDER_QUEUE");
+    when(shadowRegion.getDataStore()).thenReturn(dataStore);
+    when(dataStore.getAllLocalBucketRegions()).thenReturn(Collections.emptySet());
 
     mockGatewaySenderStats();
 
@@ -624,10 +627,13 @@ public class ParallelGatewaySenderQueueJUnitTest {
     targetRs.add(region);
 
     PartitionedRegion shadowRegion = mock(PartitionedRegion.class);
+    PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
 
     when(regionFactory.create(any())).thenReturn(shadowRegion);
 
     when(shadowRegion.getFullPath()).thenReturn("_PARALLEL_GATEWAY_SENDER_QUEUE");
+    when(shadowRegion.getDataStore()).thenReturn(dataStore);
+    when(dataStore.getAllLocalBucketRegions()).thenReturn(Collections.emptySet());
 
     mockGatewaySenderStats();
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java
new file mode 100644
index 0000000000..0b822ba361
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.UNSUCCESSFULLY_DISPATCHED;
+import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.ToDoubleFunction;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DSClock;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegionQueue;
+import org.apache.geode.internal.cache.BucketRegionQueueHelper;
+import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
+import org.apache.geode.internal.statistics.StatisticsManager;
+
+public class ParallelQueueSetPossibleDuplicateMessageJUnitTest {
+
+  private static final String GATEWAY_SENDER_ID = "ny";
+  private static final int BUCKET_ID = 85;
+  private static final long KEY = 198;
+
+  private GemFireCacheImpl cache;
+  private PartitionedRegion queueRegion;
+  private AbstractGatewaySender sender;
+  private PartitionedRegion rootRegion;
+  private BucketRegionQueue bucketRegionQueue;
+  private BucketRegionQueueHelper bucketRegionQueueHelper;
+  private GatewaySenderStats stats;
+
+  @Before
+  public void setUpGemFire() {
+    createCache();
+    createQueueRegion();
+    createGatewaySender();
+    createRootRegion();
+    createBucketRegionQueue();
+  }
+
+  private void createCache() {
+    // Mock cache
+    cache = mock(GemFireCacheImpl.class);
+    DistributedSystem ds = mock(DistributedSystem.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+
+    MeterRegistry mr = mock(MeterRegistry.class);
+    StatisticsManager sm = mock(StatisticsManager.class);
+    ClusterDistributionManager dm = mock(ClusterDistributionManager.class);
+
+    when(cache.getDistributedSystem()).thenReturn(ds);
+    when(cache.getInternalDistributedSystem()).thenReturn(ids);
+    when(cache.getDistributionManager()).thenReturn(dm);
+
+    when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
+    when(cache.getMeterRegistry()).thenReturn(mr);
+    when(cache.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+
+
+    cache.getCancelCriterion().checkCancelInProgress(null);
+
+    when(ds.createAtomicStatistics(any(), anyString())).thenReturn(mock(Statistics.class));
+    when(ids.getStatisticsManager()).thenReturn(sm);
+    when(ids.getClock()).thenReturn(mock(DSClock.class));
+    when(ids.getDistributionManager()).thenReturn(dm);
+    when(ids.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
+
+    when(mr.timer(any(), any(), any())).thenReturn(mock(Timer.class));
+    when(mr.gauge(anyString(), any(), any(ToDoubleFunction.class))).thenReturn(mock(Gauge.class));
+    when(mr.config()).thenReturn(mock(MeterRegistry.Config.class));
+
+    when(sm.createAtomicStatistics(any(), anyString())).thenReturn(mock(Statistics.class));
+
+    when(dm.getConfig()).thenReturn(mock(DistributionConfig.class));
+    when(dm.getCache()).thenReturn(cache);
+
+  }
+
+  private void createQueueRegion() {
+    // Mock queue region
+    queueRegion =
+        ParallelGatewaySenderHelper.createMockQueueRegion(cache,
+            ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID));
+  }
+
+  private void createGatewaySender() {
+    // Mock gateway sender
+    sender = ParallelGatewaySenderHelper.createGatewaySender(cache);
+    when(queueRegion.getParallelGatewaySender()).thenReturn(sender);
+    when(sender.getQueues()).thenReturn(null);
+    when(sender.getDispatcherThreads()).thenReturn(1);
+    stats = new GatewaySenderStats(new DummyStatisticsFactory(), "gatewaySenderStats-", "ln",
+        disabledClock());
+    when(sender.getStatistics()).thenReturn(stats);
+  }
+
+  private void createRootRegion() {
+    // Mock root region
+    rootRegion = mock(PartitionedRegion.class);
+    when(rootRegion.getFullPath())
+        .thenReturn(SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME);
+    when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true))
+        .thenReturn(rootRegion);
+    when(cache.getRegion(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID)))
+        .thenReturn(queueRegion);
+  }
+
+  private void createBucketRegionQueue() {
+    // Create BucketRegionQueue
+    BucketRegionQueue realBucketRegionQueue = ParallelGatewaySenderHelper
+        .createBucketRegionQueue(cache, rootRegion, queueRegion, BUCKET_ID);
+    bucketRegionQueue = spy(realBucketRegionQueue);
+
+    bucketRegionQueueHelper =
+        new BucketRegionQueueHelper(cache, queueRegion, bucketRegionQueue);
+  }
+
+  @Test
+  public void validateSetPossibleDuplicateKeyInUninitializedBucketRegionQueue() throws Exception {
+
+    assertThat(bucketRegionQueue.isInitialized()).isFalse();
+
+    // Create a real ConcurrentParallelGatewaySenderQueue
+    ParallelGatewaySenderEventProcessor processor =
+        ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(sender);
+    GatewaySenderEventImpl gsEvent = mock(GatewaySenderEventImpl.class);
+
+    // Add a mock GatewaySenderEventImpl to the temp queue
+    BlockingQueue<GatewaySenderEventImpl> tempQueue =
+        createTempQueueAndAddEvent(processor, gsEvent);
+    assertThat(tempQueue.size()).isEqualTo(1);
+
+    createAndProcessParallelQueueSetPossibleDuplicateMessage();
+
+    verify(sender, times(1)).markAsDuplicateInTempQueueEvents(KEY);
+    verify(bucketRegionQueue, times(0)).setAsPossibleDuplicate(KEY);
+
+  }
+
+  @Test
+  public void validateSetPossibleDuplicateKeyInInitializedBucketRegionQueue() throws Exception {
+
+    assertThat(bucketRegionQueue.isInitialized()).isFalse();
+
+    // Create a real ConcurrentParallelGatewaySenderQueue
+    ParallelGatewaySenderEventProcessor processor =
+        ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(sender);
+    GatewaySenderEventImpl gsEvent = mock(GatewaySenderEventImpl.class);
+
+    // Add a mock GatewaySenderEventImpl to the temp queue
+    BlockingQueue<GatewaySenderEventImpl> tempQueue =
+        createTempQueueAndAddEvent(processor, gsEvent);
+    assertThat(tempQueue.size()).isEqualTo(1);
+
+    // Clean up destroyed tokens
+    bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
+
+    assertThat(bucketRegionQueue.isInitialized()).isTrue();
+
+    bucketRegionQueue.pushKeyIntoQueue(KEY);
+
+    createAndProcessParallelQueueSetPossibleDuplicateMessage();
+
+    verify(sender, times(1)).markAsDuplicateInTempQueueEvents(KEY);
+    verify(bucketRegionQueue, times(1)).setAsPossibleDuplicate(KEY);
+
+  }
+
+  private void createAndProcessParallelQueueSetPossibleDuplicateMessage() {
+    ParallelQueueSetPossibleDuplicateMessage message =
+        new ParallelQueueSetPossibleDuplicateMessage(UNSUCCESSFULLY_DISPATCHED,
+            createRegionToDispatchedKeysMap());
+    message.process((ClusterDistributionManager) cache.getDistributionManager());
+  }
+
+  private HashMap<String, Map<Integer, List<Object>>> createRegionToDispatchedKeysMap() {
+    HashMap<String, Map<Integer, List<Object>>> regionToDispatchedKeys = new HashMap<>();
+    Map<Integer, List<Object>> bucketIdToDispatchedKeys = new HashMap<>();
+    List<Object> dispatchedKeys = new ArrayList<>();
+    dispatchedKeys.add(KEY);
+    bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys);
+    regionToDispatchedKeys.put(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID),
+        bucketIdToDispatchedKeys);
+    return regionToDispatchedKeys;
+  }
+
+  private BlockingQueue<GatewaySenderEventImpl> createTempQueueAndAddEvent(
+      ParallelGatewaySenderEventProcessor processor, GatewaySenderEventImpl event) {
+    ParallelGatewaySenderQueue queue = (ParallelGatewaySenderQueue) processor.getQueue();
+    Map<Integer, BlockingQueue<GatewaySenderEventImpl>> tempQueueMap =
+        queue.getBucketToTempQueueMap();
+    BlockingQueue<GatewaySenderEventImpl> tempQueue = new LinkedBlockingQueue<>();
+    when(event.getShadowKey()).thenReturn(KEY);
+    tempQueue.add(event);
+    tempQueueMap.put(BUCKET_ID, tempQueue);
+    return tempQueue;
+  }
+}
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index bf7ff09380..a7c9e45b11 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -683,6 +683,7 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
   short ABORT_BACKUP_REQUEST = 2183;
   short MEMBER_IDENTIFIER = 2184;
   short HOST_AND_PORT = 2185;
+  short PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE = 2186;
 
   // NOTE, codes > 65535 will take 4 bytes to serialize
 
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 7461bffe3d..2969a87296 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1281,6 +1281,7 @@ public class WANTestBase extends DistributedTestCase {
     statsList.add(gatewayReceiverStats.getOutoforderBatchesReceived());
     statsList.add(gatewayReceiverStats.getEarlyAcks());
     statsList.add(gatewayReceiverStats.getExceptionsOccurred());
+    statsList.add(gatewayReceiverStats.getPossibleDuplicateEventsReceived());
     return statsList;
   }
 
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java
new file mode 100644
index 0000000000..f9248ef7d7
--- /dev/null
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.junit.categories.WanTest;
+
+@Category({WanTest.class})
+public class ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest
+    extends WANTestBase {
+
+  private static final long serialVersionUID = 2L;
+  private static final Logger logger = LogService.getLogger();
+
+  public ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest() {
+    super();
+  }
+
+
+  private final int localId = 1;
+  private final int remoteId = 2;
+
+  @Override
+  protected final void postSetUpWANTestBase() throws Exception {
+    // The restart tests log this string
+    IgnoredException.addIgnoredException("failed accepting client connection");
+  }
+
+  /**
+   * When gateway senders starts to unqueue, and check that received events are
+   * not marked as possible duplicate.
+   */
+  @Test
+  public void testPersistentPartitionedRegionWithGatewaySenderCheckReceiverNoPossibleDuplicate()
+      throws InterruptedException {
+    int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId));
+    int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false));
+
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        100, isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        100, isOffHeap()));
+
+
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+        100, isOffHeap()));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+        100, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+
+    long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7));
+    long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7));
+
+    assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(0);
+  }
+
+
+  /**
+   * When gateway senders starts to unqueue, stop gateway sender, and check that some evnts are
+   * dispatched to receiving side,
+   * but events are not removed on sending side.
+   */
+  @Test
+  public void testPersistentPartitionedRegionWithGatewaySenderCheckReceiverPossibleDuplicate()
+      throws InterruptedException {
+    int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId));
+    int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        100, isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        100, isOffHeap()));
+
+
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+        100, isOffHeap()));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+        100, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+    createReceiverInVMs(vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+
+    long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7));
+    long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7));
+
+    assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(100);
+  }
+
+  @Test
+  public void testpersistentWanGateway_CheckReceiverPossibleDuplicate_afterSenderRestarted() {
+    int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId));
+    int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    // keep a larger batch to minimize number of exception occurrences in the log
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        100, isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        100, isOffHeap()));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        100, isOffHeap()));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        100, isOffHeap()));
+
+
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+        100, isOffHeap()));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+        100, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+    // Just making sure that though the remote site is started later,
+    // remote site is still able to get the data. Since the receivers are
+    // started before creating partition region it is quite possible that the
+    // region may loose some of the events. This needs to be handled by the code
+
+    vm5.invoke(() -> WANTestBase.killSender());
+
+    createReceiverInVMs(vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+
+    long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7));
+    long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7));
+
+    assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(40);
+  }
+
+  @Test
+  public void testpersistentWanGateway_checkPossibleDuplicateEvents_afterServerDown() {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    // keep a larger batch to minimize number of exception occurrences in the log
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true));
+    vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true));
+    vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true));
+
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    // make sure all the senders are running before doing any puts
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+
+    // Just making sure that though the remote site is started later,
+    // remote site is still able to get the data. Since the receivers are
+    // started before creating partition region it is quite possible that the
+    // region may loose some of the events. This needs to be handled by the code
+
+    vm4.invoke(() -> stopSender("ln"));
+    vm5.invoke(() -> stopSender("ln"));
+    vm6.invoke(() -> stopSender("ln"));
+    vm7.invoke(() -> stopSender("ln"));
+
+    Integer vm4NumDupplicate = vm4.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+    Integer vm5NumDupplicate = vm5.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+    Integer vm6NumDupplicate = vm6.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+    Integer vm7NumDupplicate = vm7.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+
+    assertThat(vm4NumDupplicate + vm5NumDupplicate + vm6NumDupplicate + vm7NumDupplicate)
+        .isEqualTo(800);
+
+    vm5.invoke(() -> WANTestBase.killSender());
+
+    vm4NumDupplicate = vm4.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+    vm6NumDupplicate = vm6.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+    vm7NumDupplicate = vm7.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+
+    assertThat(vm4NumDupplicate + vm6NumDupplicate + vm7NumDupplicate).isEqualTo(800);
+  }
+
+  protected SerializableRunnableIF createPartitionedRegionRunnable() {
+    return () -> WANTestBase.createPartitionedRegion(getTestMethodName(), "ln", 1, 100,
+        isOffHeap());
+  }
+
+  protected SerializableRunnableIF waitForSenderRunnable() {
+    return () -> WANTestBase.waitForSenderRunningState("ln");
+  }
+
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
index a0edf12f68..7079f485aa 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
@@ -107,6 +107,17 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
     }
   }
 
+  @Override
+  public void prepareForStop() {
+    if (!isRunning()) {
+      return;
+    }
+    pause();
+    if (eventProcessor != null && !eventProcessor.isStopped()) {
+      eventProcessor.prepareForStopProcessing();
+    }
+  }
+
   @Override
   public void stop() {
     getLifeCycleLock().writeLock().lock();
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
index a639d3366d..f32d5b9a05 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
@@ -129,6 +129,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
     return eventProcessor;
   }
 
+  @Override
+  public void prepareForStop() {}
+
   @Override
   public void stop() {
     if (logger.isDebugEnabled()) {