You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2022/06/29 20:38:34 UTC
[geode] branch develop updated: GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage (#7323)
This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new ef7dc45dd2 GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage (#7323)
ef7dc45dd2 is described below
commit ef7dc45dd24a6241fa748917205aca858f5c1c1b
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Wed Jun 29 22:38:28 2022 +0200
GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage (#7323)
* GEODE-9997: added ParallelQueueSetPossibleDuplicateMessage to signal duplicate events on secondary buckets
---
.../codeAnalysis/sanctionedDataSerializables.txt | 4 +
.../internal/ParallelAsyncEventQueueImpl.java | 3 +
.../internal/SerialAsyncEventQueueImpl.java | 3 +
.../org/apache/geode/internal/DSFIDFactory.java | 3 +
.../internal/cache/AbstractBucketRegionQueue.java | 63 ++++-
.../apache/geode/internal/cache/BucketAdvisor.java | 13 ++
.../apache/geode/internal/cache/BucketRegion.java | 10 +
.../geode/internal/cache/BucketRegionQueue.java | 38 +++
.../geode/internal/cache/GemFireCacheImpl.java | 14 +-
.../geode/internal/cache/PartitionedRegion.java | 10 +
.../sockets/command/GatewayReceiverCommand.java | 4 +
.../internal/cache/wan/AbstractGatewaySender.java | 24 ++
.../wan/AbstractGatewaySenderEventProcessor.java | 201 ++++++++++++++--
.../internal/cache/wan/GatewayReceiverStats.java | 31 ++-
.../internal/cache/wan/InternalGatewaySender.java | 4 +
.../wan/parallel/ParallelGatewaySenderQueue.java | 7 +
.../ParallelQueueSetPossibleDuplicateMessage.java | 166 ++++++++++++++
.../xmlcache/ParallelAsyncEventQueueCreation.java | 3 +
.../xmlcache/ParallelGatewaySenderCreation.java | 3 +
.../xmlcache/SerialAsyncEventQueueCreation.java | 3 +
.../xmlcache/SerialGatewaySenderCreation.java | 3 +
.../ParallelGatewaySenderQueueJUnitTest.java | 6 +
...lQueueSetPossibleDuplicateMessageJUnitTest.java | 243 ++++++++++++++++++++
.../serialization/DataSerializableFixedID.java | 1 +
.../geode/internal/cache/wan/WANTestBase.java | 1 +
...tewaySenderCheckPossibleDuplicateDUnitTest.java | 255 +++++++++++++++++++++
.../parallel/ParallelGatewaySenderImpl.java | 11 +
.../internal/serial/SerialGatewaySenderImpl.java | 3 +
28 files changed, 1108 insertions(+), 22 deletions(-)
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 84309aac4f..c75044e381 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1925,6 +1925,10 @@ org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage,2
fromData,15
toData,15
+org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage,2
+fromData,26
+toData,26
+
org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation$DestroyMessage,2
fromData,46
toData,41
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 4afb51d872..d5395df9e9 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -114,6 +114,9 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
}
}
+ @Override
+ public void prepareForStop() {}
+
@Override
public void stop() {
getLifeCycleLock().writeLock().lock();
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 1713feff76..06e6e594e2 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -135,6 +135,9 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
return eventProcessor;
}
+ @Override
+ public void prepareForStop() {}
+
@Override
public void stop() {
if (logger.isDebugEnabled()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index d11a9435b9..e6e3cb96cf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -400,6 +400,7 @@ import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationOperation;
import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage;
+import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage;
import org.apache.geode.internal.cache.wan.serial.BatchDestroyOperation;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
@@ -985,6 +986,8 @@ public class DSFIDFactory implements DataSerializableFixedID {
serializer.register(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
serializer.register(HOST_AND_PORT, HostAndPort.class);
serializer.register(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class);
+ serializer.register(PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE,
+ ParallelQueueSetPossibleDuplicateMessage.class);
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 03339a3667..3d9272b71d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -14,8 +14,11 @@
*/
package org.apache.geode.internal.cache;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.LOAD_FROM_TEMP_QUEUE;
+
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,14 +34,20 @@ import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage;
import org.apache.geode.internal.offheap.OffHeapClearRequired;
+import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;
@@ -218,11 +227,12 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
if (queues != null) {
ConcurrentParallelGatewaySenderQueue prq =
(ConcurrentParallelGatewaySenderQueue) queues.toArray()[0];
- // synchronized (prq.getBucketToTempQueueMap()) {
+
BlockingQueue<GatewaySenderEventImpl> tempQueue = prq.getBucketTmpQueue(getId());
- // .getBucketToTempQueueMap().get(getId());
if (tempQueue != null && !tempQueue.isEmpty()) {
synchronized (tempQueue) {
+ Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap =
+ new HashMap<>();
try {
// ParallelQueueRemovalMessage checks for the key in BucketRegionQueue
// and if not found there, it removes it from tempQueue. When tempQueue
@@ -235,6 +245,9 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
try {
event.setPossibleDuplicate(true);
if (addToQueue(event.getShadowKey(), event)) {
+ if (notifyDuplicateSupported()) {
+ addDuplicateEvent(regionToDuplicateEventsMap, event);
+ }
event = null;
}
} catch (ForceReattemptException e) {
@@ -257,13 +270,57 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
}
getInitializationLock().writeLock().unlock();
}
+ notifyDuplicateEvents(regionToDuplicateEventsMap);
}
}
+ }
+ }
+
+ private boolean notifyDuplicateSupported() {
+ return !(this.getPartitionedRegion().getParallelGatewaySender().getEventProcessor()
+ .getDispatcher() instanceof GatewaySenderEventCallbackDispatcher);
+ }
+
+ private void notifyDuplicateEvents(
+ Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap) {
+ if (regionToDuplicateEventsMap.isEmpty()) {
+ return;
+ }
+ if (getPartitionedRegion().getRegionAdvisor() == null) {
+ return;
+ }
+
+ Set<InternalDistributedMember> recipients =
+ getPartitionedRegion().getRegionAdvisor().adviseDataStore();
+
+ if (recipients.isEmpty()) {
+ return;
+ }
- // }
+ InternalDistributedSystem ids = getCache().getInternalDistributedSystem();
+ DistributionManager dm = ids.getDistributionManager();
+ dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0);
+
+ if (!recipients.isEmpty()) {
+ ParallelQueueSetPossibleDuplicateMessage possibleDuplicateMessage =
+ new ParallelQueueSetPossibleDuplicateMessage(LOAD_FROM_TEMP_QUEUE,
+ regionToDuplicateEventsMap);
+ possibleDuplicateMessage.setRecipients(recipients);
+ dm.putOutgoing(possibleDuplicateMessage);
}
}
+ private void addDuplicateEvent(Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap,
+ GatewaySenderEventImpl event) {
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys = regionToDuplicateEventsMap
+ .computeIfAbsent(getPartitionedRegion().getFullPath(), k -> new HashMap<>());
+
+ List<Object> dispatchedKeys =
+ bucketIdToDispatchedKeys.computeIfAbsent(getId(), k -> new ArrayList<>());
+
+ dispatchedKeys.add(event.getShadowKey());
+ }
+
@Override
public void forceSerialized(EntryEventImpl event) {
// NOOP since we want the value in the region queue to stay in object form.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index e6dcd3fb8f..5c3e98dfbf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -164,6 +164,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
private BucketAdvisor startingBucketAdvisor;
+ private volatile boolean hasBecomePrimary = false;
+
private final PartitionedRegion pRegion;
final ConcurrentMap<String, Boolean> destroyedShadowBuckets = new ConcurrentHashMap<>();
@@ -498,6 +500,13 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
}
+ BucketAdvisor getParentAdvisor() {
+ return parentAdvisor;
+ }
+
+ boolean getHasBecomePrimary() {
+ return hasBecomePrimary;
+ }
/**
* Called by the RegionAdvisor.profileRemoved, this method tests to see if the missing member is
@@ -1153,6 +1162,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
try {
synchronized (this) {
if (isHosting() && (isVolunteering() || isBecomingPrimary())) {
+ hasBecomePrimary = isBecomingPrimary();
Bucket br = regionAdvisor.getBucket(getBucket().getId());
if (br instanceof BucketRegion) {
((BucketRegion) br).beforeAcquiringPrimaryState();
@@ -1167,6 +1177,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
if (hasPrimary() && isPrimary()) {
shouldInvokeListeners = true;
}
+ } else {
+ hasBecomePrimary = false;
}
}
}
@@ -2159,6 +2171,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
private void changeFromPrimaryTo(byte requestedState) {
try {
primaryState = requestedState;
+ hasBecomePrimary = false;
} finally {
getPartitionedRegionStats().incPrimaryBucketCount(-1);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index ffb1109115..8991441a5a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -217,6 +217,8 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
}
+ private boolean receivedGatewaySenderStoppedMessage = false;
+
private final int redundancy;
/** the partitioned region to which this bucket belongs */
@@ -2535,4 +2537,12 @@ public class BucketRegion extends DistributedRegion implements Bucket {
return getSystem().getDistributionManager().getOtherDistributionManagerIds();
}
+ public boolean isReceivedGatewaySenderStoppedMessage() {
+ return receivedGatewaySenderStoppedMessage;
+ }
+
+ public void setReceivedGatewaySenderStoppedMessage(boolean notified) {
+ receivedGatewaySenderStoppedMessage = notified;
+ }
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index aba62d4d2a..8fb572c236 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -43,6 +43,7 @@ import org.apache.geode.internal.cache.persistence.query.mock.ByteComparator;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.parallel.BucketRegionQueueUnavailableException;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
@@ -202,6 +203,23 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
@Override
public void beforeAcquiringPrimaryState() {
+ PartitionedRegion region = getPartitionedRegion();
+
+ if (region != null && region.getParallelGatewaySender() != null) {
+ AbstractGatewaySenderEventProcessor ep =
+ region.getParallelGatewaySender().getEventProcessor();
+
+ if (ep != null && !(ep.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
+ if (isReceivedGatewaySenderStoppedMessage()) {
+ setReceivedGatewaySenderStoppedMessage(false);
+ return;
+ }
+ BucketAdvisor parent = getParentAdvisor(getBucketAdvisor());
+ if (parent.getHasBecomePrimary()) {
+ return;
+ }
+ }
+ }
markAsDuplicate.addAll(eventSeqNumDeque);
}
@@ -660,4 +678,24 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
}
+ public void setAsPossibleDuplicate(Object key) {
+ Object object = optimalGet(key);
+ if (object != null) {
+ ((GatewaySenderEventImpl) object).setPossibleDuplicate(true);
+ }
+ }
+
+ public boolean checkIfQueueContainsKey(Object key) {
+ return eventSeqNumDeque.contains(key);
+ }
+
+ BucketAdvisor getParentAdvisor(BucketAdvisor advisor) {
+ BucketAdvisor parent = advisor.getParentAdvisor();
+ while (parent != null) {
+ advisor = parent;
+ parent = advisor.getParentAdvisor();
+ }
+ return advisor;
+
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index e385581fbf..71d30adbf6 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -233,6 +233,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
import org.apache.geode.internal.cache.wan.GatewaySenderQueueEntrySynchronizationListener;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.wan.WANServiceProvider;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.xmlcache.CacheServerCreation;
@@ -2182,9 +2183,20 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
return false;
}
+ boolean isDebugEnabled = logger.isDebugEnabled();
+
+ for (GatewaySender sender : allGatewaySenders) {
+ try {
+ ((InternalGatewaySender) sender).prepareForStop();
+ } catch (Exception exception) {
+ if (isDebugEnabled) {
+ logger.debug("When calling Prepare for stop gw sender, ignore exception " + exception);
+ }
+ }
+ }
+
CLOSING_THREAD.set(Thread.currentThread());
try {
- boolean isDebugEnabled = logger.isDebugEnabled();
// First close the ManagementService
system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 03dd3de027..896af32576 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -771,6 +771,8 @@ public class PartitionedRegion extends LocalRegion
private boolean regionCreationNotified;
+ private boolean sentGatewaySenderStoppedMessage = false;
+
public interface RegionAdvisorFactory {
RegionAdvisor create(PartitionedRegion region);
}
@@ -10170,4 +10172,12 @@ public class PartitionedRegion extends LocalRegion
public boolean areRecoveriesInProgress() {
return prStats.getRecoveriesInProgress() > 0;
}
+
+ public boolean isSentGatewaySenderStoppedMessage() {
+ return sentGatewaySenderStoppedMessage;
+ }
+
+ public void setSentGatewaySenderStoppedMessage(boolean notified) {
+ sentGatewaySenderStoppedMessage = notified;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 14e3ccf061..f133bc51c9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -186,6 +186,10 @@ public class GatewayReceiverCommand extends BaseCommand {
}
boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
+ if (possibleDuplicate) {
+ stats.incPossibleDuplicateEventsReceived();
+ }
+
// Retrieve the region name from the message parts
Part regionNamePart = clientMessage.getPart(partNumber + 2);
String regionName = regionNamePart.getCachedString();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index e5fa44a83e..bd1a898da4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -585,6 +585,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
@Override
public abstract void startWithCleanQueue();
+ @Override
+ public abstract void prepareForStop();
+
@Override
public abstract void stop();
@@ -1298,6 +1301,27 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
}
}
+ public boolean markAsDuplicateInTempQueueEvents(Object tailKey) {
+ synchronized (queuedEventsSync) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+
+ for (TmpQueueEvent event : tmpQueuedEvents) {
+ if (tailKey.equals(event.getEvent().getTailKey())) {
+ if (isDebugEnabled) {
+ logger.debug(
+ "shadowKey {} is found in tmpQueueEvents at AbstractGatewaySender level. Marking it..",
+ tailKey);
+ }
+ event.getEvent().setPossibleDuplicate(true);
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+
+
/**
* During sender is getting stopped, if there are any cache operation on queue then that event
* will be stored in temp queue. Once sender is started, these event from tmp queue will be
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index fbdc1d9c2c..c7413aa91f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -18,6 +18,9 @@ import static java.lang.Boolean.TRUE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.RESET_BATCH;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.STOPPED_GATEWAY_SENDER;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.UNSUCCESSFULLY_DISPATCHED;
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import java.io.IOException;
@@ -29,10 +32,12 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
@@ -46,7 +51,11 @@ import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.ColocationHelper;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
@@ -58,8 +67,10 @@ import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+import org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
@@ -469,7 +480,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
boolean interrupted = Thread.interrupted();
try {
if (resetLastPeekedEvents) {
- pendingEventsInBatchesMarkAsPossibleDuplicate();
+ notifyPossibleDuplicate(RESET_BATCH, pendingEventsInBatches());
resetLastPeekedEvents();
resetLastPeekedEvents = false;
}
@@ -967,16 +978,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
private void handleUnSuccessfulBatchDispatch(List<?> events) {
final GatewaySenderStats statistics = sender.getStatistics();
statistics.incBatchesRedistributed();
-
// Set posDup flag on each event in the batch
- Iterator<?> it = events.iterator();
- while (it.hasNext() && !isStopped) {
- Object o = it.next();
- if (o instanceof GatewaySenderEventImpl) {
- GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
- ge.setPossibleDuplicate(true);
- }
- }
+ notifyPossibleDuplicate(UNSUCCESSFULLY_DISPATCHED, events);
}
/**
@@ -1217,7 +1220,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
logger.warn("Destroying GatewayEventDispatcher with actively queued data.");
}
if (resetLastPeekedEvents) {
- pendingEventsInBatchesMarkAsPossibleDuplicate();
+ notifyPossibleDuplicate(STOPPED_GATEWAY_SENDER, pendingEventsInBatches());
resetLastPeekedEvents();
resetLastPeekedEvents = false;
}
@@ -1322,17 +1325,181 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
protected abstract void enqueueEvent(GatewayQueueEvent<?, ?> event);
- private void pendingEventsInBatchesMarkAsPossibleDuplicate() {
+ private void notifyPossibleDuplicate(int reason, List<?> events) {
+ Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap = new HashMap<>();
+ boolean pgwsender = (getSender().isParallel()
+ && !(getDispatcher() instanceof GatewaySenderEventCallbackDispatcher));
+
+ for (Object o : events) {
+ if (o instanceof GatewaySenderEventImpl) {
+ GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
+ if (!ge.getPossibleDuplicate()) {
+ if (pgwsender) {
+ addDuplicateEvent(regionToDispatchedKeysMap, ge);
+ }
+ ge.setPossibleDuplicate(true);
+ }
+ }
+ }
+
+ if (!pgwsender) {
+ return;
+ }
+
+ PartitionedRegion queueRegion;
+ if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
+ queueRegion =
+ (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) queue).getRegion();
+ } else {
+ queueRegion =
+ (PartitionedRegion) ((ParallelGatewaySenderQueue) queue).getRegion();
+ }
+
+ if (queueRegion == null || queueRegion.getRegionAdvisor() == null
+ || queueRegion.getDataStore() == null) {
+ return;
+ }
+
+ if (reason == STOPPED_GATEWAY_SENDER) {
+ final Set<Integer> buckets = queueRegion.getDataStore().getAllLocalPrimaryBucketIds();
+ if (regionToDispatchedKeysMap.isEmpty()) {
+ if (queueRegion.isSentGatewaySenderStoppedMessage()) {
+ return;
+ }
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys = new HashMap<>();
+ for (Integer bId : buckets) {
+ bucketIdToDispatchedKeys.put(bId, Collections.emptyList());
+ }
+ regionToDispatchedKeysMap.put(queueRegion.getFullPath(), bucketIdToDispatchedKeys);
+
+ } else {
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+ regionToDispatchedKeysMap.get(queueRegion.getFullPath());
+ if (bucketIdToDispatchedKeys == null) {
+ return;
+ }
+ for (Integer bId : buckets) {
+ bucketIdToDispatchedKeys.putIfAbsent(bId, Collections.emptyList());
+ }
+ }
+ }
+
+ if (regionToDispatchedKeysMap.size() > 0) {
+ Set<InternalDistributedMember> recipients =
+ getAllRecipients(sender.getCache(), regionToDispatchedKeysMap);
+
+ if (recipients.isEmpty()) {
+ return;
+ }
+
+ if (reason == STOPPED_GATEWAY_SENDER) {
+ if (!queueRegion.isSentGatewaySenderStoppedMessage()) {
+ queueRegion.setSentGatewaySenderStoppedMessage(true);
+ }
+ }
+
+ InternalDistributedSystem ids = sender.getCache().getInternalDistributedSystem();
+ DistributionManager dm = ids.getDistributionManager();
+ dm.retainMembersWithSameOrNewerVersion(recipients, KnownVersion.GEODE_1_15_0);
+
+ if (!recipients.isEmpty()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "notifyPossibleDuplicate send ParallelQueueSetPossibleDuplicateMessage recipients {}.",
+ recipients);
+ }
+
+ ParallelQueueSetPossibleDuplicateMessage pqspdm =
+ new ParallelQueueSetPossibleDuplicateMessage(reason, regionToDispatchedKeysMap);
+ pqspdm.setRecipients(recipients);
+ dm.putOutgoing(pqspdm);
+ }
+ }
+
+ }
+
+ protected void addDuplicateEvent(
+ Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap,
+ GatewaySenderEventImpl event) {
+ PartitionedRegion prQ = null;
+ int bucketId = -1;
+ Object key = null;
+ InternalCache cache = sender.getCache();
+ String regionPath = event.getRegionPath();
+
+ if (event.getRegion() != null) {
+ if (cache.getRegion(regionPath) instanceof DistributedRegion) {
+ prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(event.getRegion().getFullPath());
+ bucketId = event.getEventId().getBucketID();
+ key = event.getEventId();
+ } else {
+ prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(ColocationHelper
+ .getLeaderRegion((PartitionedRegion) event.getRegion()).getFullPath());
+ bucketId = event.getBucketId();
+ key = event.getShadowKey();
+ }
+ } else {
+ Region region = (PartitionedRegion) cache.getRegion(regionPath);
+ if (region != null && !region.isDestroyed()) {
+ if (region instanceof DistributedRegion) {
+ prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(region.getFullPath());
+ bucketId = event.getBucketId();
+ key = event.getEventId();
+ } else {
+ prQ = ((ParallelGatewaySenderQueue) getQueue()).getRegion(
+ ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath());
+ bucketId = event.getBucketId();
+ key = event.getShadowKey();
+ }
+ }
+ }
+
+ if (prQ == null) {
+ return;
+ }
+
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+ regionToDispatchedKeysMap.get(prQ.getFullPath());
+ if (bucketIdToDispatchedKeys == null) {
+ bucketIdToDispatchedKeys = new ConcurrentHashMap<>();
+ regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
+ }
+
+ List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId);
+ if (dispatchedKeys == null) {
+ dispatchedKeys = new ArrayList<>();
+ bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys);
+ }
+ dispatchedKeys.add(key);
+
+ }
+
+ public void prepareForStopProcessing() {
+ notifyPossibleDuplicate(STOPPED_GATEWAY_SENDER, pendingEventsInBatches());
+ }
+
+ private Set<InternalDistributedMember> getAllRecipients(InternalCache cache,
+ Map<String, Map<Integer, List<Object>>> map) {
+ Set<InternalDistributedMember> recipients = new ObjectOpenHashSet<>();
+ for (Object pr : map.keySet()) {
+ PartitionedRegion partitionedRegion = (PartitionedRegion) cache.getRegion((String) pr);
+ if (partitionedRegion != null && partitionedRegion.getRegionAdvisor() != null) {
+ recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore());
+ }
+ }
+ return recipients;
+ }
+
+
+ private List<GatewaySenderEventImpl> pendingEventsInBatches() {
+ List<GatewaySenderEventImpl> pendingEvents = new ArrayList<>();
if (!batchIdToEventsMap.isEmpty()) {
for (Map.Entry<Integer, List<GatewaySenderEventImpl>[]> entry : batchIdToEventsMap
.entrySet()) {
- for (GatewaySenderEventImpl event : entry.getValue()[0]) {
- if (!event.getPossibleDuplicate()) {
- event.setPossibleDuplicate(true);
- }
- }
+ pendingEvents.addAll(entry.getValue()[0]);
}
}
+ return pendingEvents;
}
protected static class SenderStopperCallable implements Callable<Boolean> {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
index 73af7b0b7b..e5764e4151 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
@@ -32,6 +32,12 @@ public class GatewayReceiverStats extends CacheServerStats {
*/
private static final String DUPLICATE_BATCHES_RECEIVED = "duplicateBatchesReceived";
+ /**
+ * Name of the number of events with possible duplicate indication received statistic
+ */
+ private static final String POSSIBLE_DUPLICATE_EVENTS_RECEIVED =
+ "possibleduplicateEventsReceived";
+
/**
* Name of the event queue time statistic
*/
@@ -86,6 +92,11 @@ public class GatewayReceiverStats extends CacheServerStats {
*/
private final int duplicateBatchesReceivedId;
+ /**
+ * Id of the number of events with possible duplicate indication received statistic
+ */
+ private final int possibleduplicateEventsReceivedId;
+
/**
* Id of the event queue time statistic
*/
@@ -159,7 +170,11 @@ public class GatewayReceiverStats extends CacheServerStats {
f.createLongCounter(EXCEPTIONS_OCCURRED,
"number of exceptions occurred while porcessing the batches", "operations"),
f.createLongCounter(EVENTS_RETRIED,
- "total number events retried by this GatewayReceiver due to exceptions", "operations")};
+ "total number events retried by this GatewayReceiver due to exceptions", "operations"),
+ f.createLongCounter(POSSIBLE_DUPLICATE_EVENTS_RECEIVED,
+ "total number of possible duplicate events received by this GatewayReceiver",
+ "operations")
+ };
return new GatewayReceiverStats(f, ownerName, typeName, descriptors, meterRegistry);
}
@@ -178,6 +193,7 @@ public class GatewayReceiverStats extends CacheServerStats {
unknowsOperationsReceivedId = statType.nameToId(UNKNOWN_OPERATIONS_RECEIVED);
exceptionsOccurredId = statType.nameToId(EXCEPTIONS_OCCURRED);
eventsRetriedId = statType.nameToId(EVENTS_RETRIED);
+ possibleduplicateEventsReceivedId = statType.nameToId(POSSIBLE_DUPLICATE_EVENTS_RECEIVED);
this.meterRegistry = meterRegistry;
eventsReceivedCounter = LegacyStatCounter.builder(EVENTS_RECEIVED_COUNTER_NAME)
@@ -198,6 +214,19 @@ public class GatewayReceiverStats extends CacheServerStats {
return stats.getLong(duplicateBatchesReceivedId);
}
+
+ /**
+ * Increments the number of duplicate events received by 1.
+ */
+ public void incPossibleDuplicateEventsReceived() {
+ stats.incLong(possibleduplicateEventsReceivedId, 1);
+ }
+
+ public long getPossibleDuplicateEventsReceived() {
+ return stats.getLong(possibleduplicateEventsReceivedId);
+ }
+
+
/**
* Increments the number of out of order batches received by 1.
*/
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
index 13e36e7f9b..c4f48b7dd6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java
@@ -49,4 +49,8 @@ public interface InternalGatewaySender extends GatewaySender {
void setStartEventProcessorInPausedState();
int getEventQueueSize();
+
+
+ void prepareForStop();
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 678286c92b..c37b4aefc8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -593,6 +593,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
} finally {
if (prQ != null) {
userRegionNameToShadowPRMap.put(userPR.getFullPath(), prQ);
+ prQ.setSentGatewaySenderStoppedMessage(false);
+ if (prQ.getDataStore() != null) {
+ final Set<BucketRegion> buckets = prQ.getDataStore().getAllLocalBucketRegions();
+ for (BucketRegion br : buckets) {
+ br.setReceivedGatewaySenderStoppedMessage(false);
+ }
+ }
}
/*
* Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java
new file mode 100644
index 0000000000..d1c2de3166
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessage.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.PooledDistributionMessage;
+import org.apache.geode.internal.cache.BucketRegionQueue;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Sets events in the remote secondary queues to possible duplicate
+ *
+ * @since Geode 1.15
+ */
+public class ParallelQueueSetPossibleDuplicateMessage extends PooledDistributionMessage {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private int reason;
+ private Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap;
+
+ public static final int UNSUCCESSFULLY_DISPATCHED = 0;
+ public static final int RESET_BATCH = 1;
+ public static final int LOAD_FROM_TEMP_QUEUE = 2;
+ public static final int STOPPED_GATEWAY_SENDER = 3;
+
+
+ public ParallelQueueSetPossibleDuplicateMessage() {}
+
+ public ParallelQueueSetPossibleDuplicateMessage(int reason,
+ Map<String, Map<Integer, List<Object>>> regionToDuplicateEventsMap) {
+ this.reason = reason;
+ this.regionToDuplicateEventsMap = regionToDuplicateEventsMap;
+ }
+
+ @Override
+ public int getDSFID() {
+ return PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE;
+ }
+
+ @Override
+ public String toString() {
+ String cname = getShortClassName();
+ final StringBuilder sb = new StringBuilder(cname);
+ sb.append("reason=").append(reason);
+ sb.append(" regionToDispatchedKeysMap=").append(regionToDuplicateEventsMap);
+ sb.append(" sender=").append(getSender());
+ return sb.toString();
+ }
+
+ @Override
+ protected void process(ClusterDistributionManager dm) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ final InternalCache cache = dm.getCache();
+
+ if (cache == null) {
+ return;
+ }
+ final InitializationLevel oldLevel =
+ LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
+ try {
+ for (String name : regionToDuplicateEventsMap.keySet()) {
+ final PartitionedRegion region = (PartitionedRegion) cache.getRegion(name);
+ if (region == null) {
+ continue;
+ }
+
+ AbstractGatewaySender abstractSender = region.getParallelGatewaySender();
+ // Find the map: bucketId to dispatchedKeys
+ // Find the bucket
+ // Destroy the keys
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+ this.regionToDuplicateEventsMap.get(name);
+ for (Integer bId : bucketIdToDispatchedKeys.keySet()) {
+ final String bucketFullPath =
+ SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR
+ + region.getBucketName(bId);
+ BucketRegionQueue brq =
+ (BucketRegionQueue) cache.getInternalRegionByPath(bucketFullPath);
+
+ if (brq != null && reason == STOPPED_GATEWAY_SENDER) {
+ brq.setReceivedGatewaySenderStoppedMessage(true);
+ }
+
+ if (isDebugEnabled) {
+ logger.debug(
+ "ParallelQueueSetPossibleDuplicateMessage : The bucket in the cache is bucketRegionName : {} bucket: {}",
+ bucketFullPath, brq);
+ }
+
+ List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bId);
+ if (dispatchedKeys != null && !dispatchedKeys.isEmpty()) {
+ for (Object key : dispatchedKeys) {
+ // First, clear the Event from tempQueueEvents at AbstractGatewaySender level, if
+ // exists
+ // synchronize on AbstractGatewaySender.queuedEventsSync while doing so
+ abstractSender.markAsDuplicateInTempQueueEvents(key);
+
+ if (brq != null) {
+ if (isDebugEnabled) {
+ logger.debug(
+ "ParallelQueueSetPossibleDuplicateMessage : The bucket {} key {}.",
+ brq, key);
+ }
+
+ if (brq.checkIfQueueContainsKey(key)) {
+ brq.setAsPossibleDuplicate(key);
+ }
+ }
+ }
+ }
+ }
+ } //
+ } finally {
+ LocalRegion.setThreadInitLevelRequirement(oldLevel);
+ }
+ }
+
+ @Override
+ public void toData(DataOutput out,
+ SerializationContext context) throws IOException {
+ super.toData(out, context);
+ DataSerializer.writeInteger(this.reason, out);
+ DataSerializer.writeHashMap(this.regionToDuplicateEventsMap, out);
+ }
+
+ @Override
+ public void fromData(DataInput in,
+ DeserializationContext context) throws IOException, ClassNotFoundException {
+ super.fromData(in, context);
+ this.reason = DataSerializer.readInteger(in);
+ this.regionToDuplicateEventsMap = DataSerializer.readHashMap(in);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
index b299925fbd..b9433dcbc6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
@@ -48,6 +48,9 @@ public class ParallelAsyncEventQueueCreation extends AbstractGatewaySender
@Override
public void startWithCleanQueue() {}
+ @Override
+ public void prepareForStop() {}
+
@Override
public void stop() {}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
index 82adbee5dd..561f0b4634 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
@@ -48,6 +48,9 @@ public class ParallelGatewaySenderCreation extends AbstractGatewaySender impleme
@Override
public void startWithCleanQueue() {}
+ @Override
+ public void prepareForStop() {}
+
@Override
public void stop() {}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
index 4b44799e06..834f6ad474 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
@@ -47,6 +47,9 @@ public class SerialAsyncEventQueueCreation extends AbstractGatewaySender impleme
@Override
public void startWithCleanQueue() {}
+ @Override
+ public void prepareForStop() {}
+
@Override
public void stop() {}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
index b0f02ceb91..a239fd9972 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
@@ -48,6 +48,9 @@ public class SerialGatewaySenderCreation extends AbstractGatewaySender implement
@Override
public void startWithCleanQueue() {}
+ @Override
+ public void prepareForStop() {}
+
@Override
public void stop() {}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index 92d1601d8b..7e8000f0d1 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -556,10 +556,13 @@ public class ParallelGatewaySenderQueueJUnitTest {
targetRs.add(region);
PartitionedRegion shadowRegion = mock(PartitionedRegion.class);
+ PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
when(regionFactory.create(any())).thenReturn(shadowRegion);
when(shadowRegion.getFullPath()).thenReturn("_PARALLEL_GATEWAY_SENDER_QUEUE");
+ when(shadowRegion.getDataStore()).thenReturn(dataStore);
+ when(dataStore.getAllLocalBucketRegions()).thenReturn(Collections.emptySet());
mockGatewaySenderStats();
@@ -624,10 +627,13 @@ public class ParallelGatewaySenderQueueJUnitTest {
targetRs.add(region);
PartitionedRegion shadowRegion = mock(PartitionedRegion.class);
+ PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
when(regionFactory.create(any())).thenReturn(shadowRegion);
when(shadowRegion.getFullPath()).thenReturn("_PARALLEL_GATEWAY_SENDER_QUEUE");
+ when(shadowRegion.getDataStore()).thenReturn(dataStore);
+ when(dataStore.getAllLocalBucketRegions()).thenReturn(Collections.emptySet());
mockGatewaySenderStats();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java
new file mode 100644
index 0000000000..0b822ba361
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueSetPossibleDuplicateMessageJUnitTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.internal.cache.wan.parallel.ParallelQueueSetPossibleDuplicateMessage.UNSUCCESSFULLY_DISPATCHED;
+import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.ToDoubleFunction;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DSClock;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegionQueue;
+import org.apache.geode.internal.cache.BucketRegionQueueHelper;
+import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
+import org.apache.geode.internal.statistics.StatisticsManager;
+
+public class ParallelQueueSetPossibleDuplicateMessageJUnitTest {
+
+ private static final String GATEWAY_SENDER_ID = "ny";
+ private static final int BUCKET_ID = 85;
+ private static final long KEY = 198;
+
+ private GemFireCacheImpl cache;
+ private PartitionedRegion queueRegion;
+ private AbstractGatewaySender sender;
+ private PartitionedRegion rootRegion;
+ private BucketRegionQueue bucketRegionQueue;
+ private BucketRegionQueueHelper bucketRegionQueueHelper;
+ private GatewaySenderStats stats;
+
+ @Before
+ public void setUpGemFire() {
+ createCache();
+ createQueueRegion();
+ createGatewaySender();
+ createRootRegion();
+ createBucketRegionQueue();
+ }
+
+ private void createCache() {
+ // Mock cache
+ cache = mock(GemFireCacheImpl.class);
+ DistributedSystem ds = mock(DistributedSystem.class);
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+
+ MeterRegistry mr = mock(MeterRegistry.class);
+ StatisticsManager sm = mock(StatisticsManager.class);
+ ClusterDistributionManager dm = mock(ClusterDistributionManager.class);
+
+ when(cache.getDistributedSystem()).thenReturn(ds);
+ when(cache.getInternalDistributedSystem()).thenReturn(ids);
+ when(cache.getDistributionManager()).thenReturn(dm);
+
+ when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
+ when(cache.getMeterRegistry()).thenReturn(mr);
+ when(cache.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+
+
+ cache.getCancelCriterion().checkCancelInProgress(null);
+
+ when(ds.createAtomicStatistics(any(), anyString())).thenReturn(mock(Statistics.class));
+ when(ids.getStatisticsManager()).thenReturn(sm);
+ when(ids.getClock()).thenReturn(mock(DSClock.class));
+ when(ids.getDistributionManager()).thenReturn(dm);
+ when(ids.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
+
+ when(mr.timer(any(), any(), any())).thenReturn(mock(Timer.class));
+ when(mr.gauge(anyString(), any(), any(ToDoubleFunction.class))).thenReturn(mock(Gauge.class));
+ when(mr.config()).thenReturn(mock(MeterRegistry.Config.class));
+
+ when(sm.createAtomicStatistics(any(), anyString())).thenReturn(mock(Statistics.class));
+
+ when(dm.getConfig()).thenReturn(mock(DistributionConfig.class));
+ when(dm.getCache()).thenReturn(cache);
+
+ }
+
+ private void createQueueRegion() {
+ // Mock queue region
+ queueRegion =
+ ParallelGatewaySenderHelper.createMockQueueRegion(cache,
+ ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID));
+ }
+
+ private void createGatewaySender() {
+ // Mock gateway sender
+ sender = ParallelGatewaySenderHelper.createGatewaySender(cache);
+ when(queueRegion.getParallelGatewaySender()).thenReturn(sender);
+ when(sender.getQueues()).thenReturn(null);
+ when(sender.getDispatcherThreads()).thenReturn(1);
+ stats = new GatewaySenderStats(new DummyStatisticsFactory(), "gatewaySenderStats-", "ln",
+ disabledClock());
+ when(sender.getStatistics()).thenReturn(stats);
+ }
+
+ private void createRootRegion() {
+ // Mock root region
+ rootRegion = mock(PartitionedRegion.class);
+ when(rootRegion.getFullPath())
+ .thenReturn(SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME);
+ when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true))
+ .thenReturn(rootRegion);
+ when(cache.getRegion(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID)))
+ .thenReturn(queueRegion);
+ }
+
+ private void createBucketRegionQueue() {
+ // Create BucketRegionQueue
+ BucketRegionQueue realBucketRegionQueue = ParallelGatewaySenderHelper
+ .createBucketRegionQueue(cache, rootRegion, queueRegion, BUCKET_ID);
+ bucketRegionQueue = spy(realBucketRegionQueue);
+
+ bucketRegionQueueHelper =
+ new BucketRegionQueueHelper(cache, queueRegion, bucketRegionQueue);
+ }
+
+ @Test
+ public void validateSetPossibleDuplicateKeyInUninitializedBucketRegionQueue() throws Exception {
+
+ assertThat(bucketRegionQueue.isInitialized()).isFalse();
+
+ // Create a real ConcurrentParallelGatewaySenderQueue
+ ParallelGatewaySenderEventProcessor processor =
+ ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(sender);
+ GatewaySenderEventImpl gsEvent = mock(GatewaySenderEventImpl.class);
+
+ // Add a mock GatewaySenderEventImpl to the temp queue
+ BlockingQueue<GatewaySenderEventImpl> tempQueue =
+ createTempQueueAndAddEvent(processor, gsEvent);
+ assertThat(tempQueue.size()).isEqualTo(1);
+
+ createAndProcessParallelQueueSetPossibleDuplicateMessage();
+
+ verify(sender, times(1)).markAsDuplicateInTempQueueEvents(KEY);
+ verify(bucketRegionQueue, times(0)).setAsPossibleDuplicate(KEY);
+
+ }
+
+ @Test
+ public void validateSetPossibleDuplicateKeyInInitializedBucketRegionQueue() throws Exception {
+
+ assertThat(bucketRegionQueue.isInitialized()).isFalse();
+
+ // Create a real ConcurrentParallelGatewaySenderQueue
+ ParallelGatewaySenderEventProcessor processor =
+ ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(sender);
+ GatewaySenderEventImpl gsEvent = mock(GatewaySenderEventImpl.class);
+
+ // Add a mock GatewaySenderEventImpl to the temp queue
+ BlockingQueue<GatewaySenderEventImpl> tempQueue =
+ createTempQueueAndAddEvent(processor, gsEvent);
+ assertThat(tempQueue.size()).isEqualTo(1);
+
+ // Clean up destroyed tokens
+ bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
+
+ assertThat(bucketRegionQueue.isInitialized()).isTrue();
+
+ bucketRegionQueue.pushKeyIntoQueue(KEY);
+
+ createAndProcessParallelQueueSetPossibleDuplicateMessage();
+
+ verify(sender, times(1)).markAsDuplicateInTempQueueEvents(KEY);
+ verify(bucketRegionQueue, times(1)).setAsPossibleDuplicate(KEY);
+
+ }
+
+ private void createAndProcessParallelQueueSetPossibleDuplicateMessage() {
+ ParallelQueueSetPossibleDuplicateMessage message =
+ new ParallelQueueSetPossibleDuplicateMessage(UNSUCCESSFULLY_DISPATCHED,
+ createRegionToDispatchedKeysMap());
+ message.process((ClusterDistributionManager) cache.getDistributionManager());
+ }
+
+ private HashMap<String, Map<Integer, List<Object>>> createRegionToDispatchedKeysMap() {
+ HashMap<String, Map<Integer, List<Object>>> regionToDispatchedKeys = new HashMap<>();
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys = new HashMap<>();
+ List<Object> dispatchedKeys = new ArrayList<>();
+ dispatchedKeys.add(KEY);
+ bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys);
+ regionToDispatchedKeys.put(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID),
+ bucketIdToDispatchedKeys);
+ return regionToDispatchedKeys;
+ }
+
+ private BlockingQueue<GatewaySenderEventImpl> createTempQueueAndAddEvent(
+ ParallelGatewaySenderEventProcessor processor, GatewaySenderEventImpl event) {
+ ParallelGatewaySenderQueue queue = (ParallelGatewaySenderQueue) processor.getQueue();
+ Map<Integer, BlockingQueue<GatewaySenderEventImpl>> tempQueueMap =
+ queue.getBucketToTempQueueMap();
+ BlockingQueue<GatewaySenderEventImpl> tempQueue = new LinkedBlockingQueue<>();
+ when(event.getShadowKey()).thenReturn(KEY);
+ tempQueue.add(event);
+ tempQueueMap.put(BUCKET_ID, tempQueue);
+ return tempQueue;
+ }
+}
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index bf7ff09380..a7c9e45b11 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -683,6 +683,7 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
short ABORT_BACKUP_REQUEST = 2183;
short MEMBER_IDENTIFIER = 2184;
short HOST_AND_PORT = 2185;
+ short PARALLEL_QUEUE_SET_POSSIBLE_DUPLICATE_MESSAGE = 2186;
// NOTE, codes > 65535 will take 4 bytes to serialize
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 7461bffe3d..2969a87296 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1281,6 +1281,7 @@ public class WANTestBase extends DistributedTestCase {
statsList.add(gatewayReceiverStats.getOutoforderBatchesReceived());
statsList.add(gatewayReceiverStats.getEarlyAcks());
statsList.add(gatewayReceiverStats.getExceptionsOccurred());
+ statsList.add(gatewayReceiverStats.getPossibleDuplicateEventsReceived());
return statsList;
}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java
new file mode 100644
index 0000000000..f9248ef7d7
--- /dev/null
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.junit.categories.WanTest;
+
+@Category({WanTest.class})
+public class ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest
+ extends WANTestBase {
+
+ private static final long serialVersionUID = 2L;
+ private static final Logger logger = LogService.getLogger();
+
+ public ParallelWANPersistenceEnabledGatewaySenderCheckPossibleDuplicateDUnitTest() {
+ super();
+ }
+
+
+ private final int localId = 1;
+ private final int remoteId = 2;
+
+ @Override
+ protected final void postSetUpWANTestBase() throws Exception {
+ // The restart tests log this string
+ IgnoredException.addIgnoredException("failed accepting client connection");
+ }
+
+ /**
+ * When gateway senders starts to unqueue, and check that received events are
+ * not marked as possible duplicate.
+ */
+ @Test
+ public void testPersistentPartitionedRegionWithGatewaySenderCheckReceiverNoPossibleDuplicate()
+ throws InterruptedException {
+ int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId));
+ int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false));
+
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+
+
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+
+ long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7));
+ long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7));
+
+ assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(0);
+ }
+
+
+ /**
+ * When gateway senders starts to unqueue, stop gateway sender, and check that some evnts are
+ * dispatched to receiving side,
+ * but events are not removed on sending side.
+ */
+ @Test
+ public void testPersistentPartitionedRegionWithGatewaySenderCheckReceiverPossibleDuplicate()
+ throws InterruptedException {
+ int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId));
+ int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(5));
+
+ vm4.invoke(
+ () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+ vm5.invoke(
+ () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+
+
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+
+ long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7));
+ long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7));
+
+ assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(100);
+ }
+
+ @Test
+ public void testpersistentWanGateway_CheckReceiverPossibleDuplicate_afterSenderRestarted() {
+ int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(localId));
+ int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(remoteId, lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // keep a larger batch to minimize number of exception occurrences in the log
+ vm4.invoke(
+ () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+ vm5.invoke(
+ () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+ vm6.invoke(
+ () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+ vm7.invoke(
+ () -> WANTestBase.createSender("ln", remoteId, true, 100, 10, false, true, null, false));
+
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 100, isOffHeap()));
+
+
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 100, isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
+
+ // Just making sure that though the remote site is started later,
+ // remote site is still able to get the data. Since the receivers are
+ // started before creating partition region it is quite possible that the
+ // region may loose some of the events. This needs to be handled by the code
+
+ vm5.invoke(() -> WANTestBase.killSender());
+
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
+
+ long vm2NumDuplicate = vm2.invoke(() -> WANTestBase.getReceiverStats().get(7));
+ long vm3NumDuplicate = vm3.invoke(() -> WANTestBase.getReceiverStats().get(7));
+
+ assertThat(vm2NumDuplicate + vm3NumDuplicate).isEqualTo(40);
+ }
+
+ @Test
+ public void testpersistentWanGateway_checkPossibleDuplicateEvents_afterServerDown() {
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // keep a larger batch to minimize number of exception occurrences in the log
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true));
+ vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true));
+ vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 200, false, true, null, true));
+
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ // make sure all the senders are running before doing any puts
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+
+ // Just making sure that though the remote site is started later,
+ // remote site is still able to get the data. Since the receivers are
+ // started before creating partition region it is quite possible that the
+ // region may loose some of the events. This needs to be handled by the code
+
+ vm4.invoke(() -> stopSender("ln"));
+ vm5.invoke(() -> stopSender("ln"));
+ vm6.invoke(() -> stopSender("ln"));
+ vm7.invoke(() -> stopSender("ln"));
+
+ Integer vm4NumDupplicate = vm4.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+ Integer vm5NumDupplicate = vm5.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+ Integer vm6NumDupplicate = vm6.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+ Integer vm7NumDupplicate = vm7.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+
+ assertThat(vm4NumDupplicate + vm5NumDupplicate + vm6NumDupplicate + vm7NumDupplicate)
+ .isEqualTo(800);
+
+ vm5.invoke(() -> WANTestBase.killSender());
+
+ vm4NumDupplicate = vm4.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+ vm6NumDupplicate = vm6.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+ vm7NumDupplicate = vm7.invoke(() -> WANTestBase.getNumOfPossibleDuplicateEvents("ln"));
+
+ assertThat(vm4NumDupplicate + vm6NumDupplicate + vm7NumDupplicate).isEqualTo(800);
+ }
+
+ protected SerializableRunnableIF createPartitionedRegionRunnable() {
+ return () -> WANTestBase.createPartitionedRegion(getTestMethodName(), "ln", 1, 100,
+ isOffHeap());
+ }
+
+ protected SerializableRunnableIF waitForSenderRunnable() {
+ return () -> WANTestBase.waitForSenderRunningState("ln");
+ }
+
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
index a0edf12f68..7079f485aa 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
@@ -107,6 +107,17 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
}
}
+ @Override
+ public void prepareForStop() {
+ if (!isRunning()) {
+ return;
+ }
+ pause();
+ if (eventProcessor != null && !eventProcessor.isStopped()) {
+ eventProcessor.prepareForStopProcessing();
+ }
+ }
+
@Override
public void stop() {
getLifeCycleLock().writeLock().lock();
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
index a639d3366d..f32d5b9a05 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
@@ -129,6 +129,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
return eventProcessor;
}
+ @Override
+ public void prepareForStop() {}
+
@Override
public void stop() {
if (logger.isDebugEnabled()) {