You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/03/05 19:36:12 UTC

[GitHub] [geode] DonalEvans commented on a change in pull request #6052: GEODE-8971: Add grace period when stopping gateway sender with group-…

DonalEvans commented on a change in pull request #6052:
URL: https://github.com/apache/geode/pull/6052#discussion_r587859504



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
##########
@@ -132,22 +134,24 @@ public int eventQueueSize() {
   }
 
   @Override
-  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue)
+  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue)
       throws IOException, CacheException {
-    enqueueEvent(operation, event, substituteValue, false);
+    return enqueueEvent(operation, event, substituteValue, false, null);
   }
 
   @Override
-  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue,
-      boolean isLastEventInTransaction) throws IOException, CacheException {
+  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue,
+      boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> condition)
+      throws IOException, CacheException {
     Region region = event.getRegion();
     // int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation)event);

Review comment:
       The variable `region` is never used, and this commented-out line should be removed.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -702,44 +704,15 @@ public boolean put(Object object) throws InterruptedException, CacheException {
     boolean isDREvent = isDREvent(sender.getCache(), value);
 
     String regionPath = value.getRegionPath();

Review comment:
       This line should be moved into the `getRegionPathForEventAndType()` method and the `regionPath` argument should be removed from the method signature.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -847,7 +855,14 @@ private EventsAndLastKey peekEventsWithTransactionId(TransactionId transactionId
     }
   }
 
-  EventsAndLastKey getElementsMatching(Predicate condition, Predicate stopCondition, long lastKey) {
+  /**
+   * This method returns a list of objects that fulfill the matchingPredicate
+   * If a matching object also fulfills the endPredicate then the method
+   * stops looking for more matching objects.
+   */
+  List<KeyAndEventPair> getElementsMatching(Predicate<InternalGatewayQueueEvent> condition,

Review comment:
       With the change to this method's return type, `EventsAndLastKey` is no longer used, so that class should be removed.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -862,19 +877,35 @@ EventsAndLastKey getElementsMatching(Predicate condition, Predicate stopConditio
         continue;
       }
 
-      if (condition.test(object)) {
-        elementsMatching.add(object);
-        peekedIds.add(currentKey);
-        extraPeekedIds.add(currentKey);
-        lastKey = currentKey;
+      if (condition.test((InternalGatewayQueueEvent) object)) {

Review comment:
       The casts here can be cleaned up a bit by declaring `object` as a `InternalGatewayQueueEvent` on line 866 and then casting to `InternalGatewayQueueEvent` when assigned from `optimalGet()`. It would also be good to rename `object` to `event`.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
##########
@@ -453,21 +454,22 @@ public Object peek() {
    * If a matching object also fulfills the endPredicate then the method
    * stops looking for more matching objects.
    */
-  public List<Object> getElementsMatching(Predicate matchingPredicate, Predicate endPredicate) {
+  public List<Object> getElementsMatching(Predicate<InternalGatewayQueueEvent> matchingPredicate,
+      Predicate<InternalGatewayQueueEvent> endPredicate) {
     getInitializationLock().readLock().lock();
     try {
       if (this.getPartitionedRegion().isDestroyed()) {
         throw new BucketRegionQueueUnavailableException();
       }
-      List<Object> elementsMatching = new ArrayList<Object>();
+      List<Object> elementsMatching = new ArrayList();

Review comment:
       The compiler warning here can be fixed by using `new ArrayList<>();`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
##########
@@ -453,21 +454,22 @@ public Object peek() {
    * If a matching object also fulfills the endPredicate then the method
    * stops looking for more matching objects.
    */
-  public List<Object> getElementsMatching(Predicate matchingPredicate, Predicate endPredicate) {
+  public List<Object> getElementsMatching(Predicate<InternalGatewayQueueEvent> matchingPredicate,
+      Predicate<InternalGatewayQueueEvent> endPredicate) {
     getInitializationLock().readLock().lock();
     try {
       if (this.getPartitionedRegion().isDestroyed()) {
         throw new BucketRegionQueueUnavailableException();
       }
-      List<Object> elementsMatching = new ArrayList<Object>();
+      List<Object> elementsMatching = new ArrayList();
       Iterator<Object> it = this.eventSeqNumDeque.iterator();
       while (it.hasNext()) {
         Object key = it.next();
         Object object = optimalGet(key);
-        if (matchingPredicate.test(object)) {
+        if (matchingPredicate.test((InternalGatewayQueueEvent) object)) {

Review comment:
       I believe there's a possibility of a ClassCastException here, since there's no guarantee that the Objects returned from `optimalGet()` in this class can be cast to `InternalGatewayQueueEvent`. It would be best to check that `object` is an instance of InternalGatewayQueueEvent prior to casting it here.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
##########
@@ -137,8 +137,8 @@ public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForT
         .doAnswer(invocation -> null)
         .when(queue).peekAhead();
 
-    doAnswer(invocation -> new SerialGatewaySenderQueue.EventsAndLastKey(
-        Arrays.asList(new Object[] {event4}), 2L))
+    doAnswer(invocation -> Arrays

Review comment:
       Compiler warnings here can be fixed by changing this to `invocation -> Collections.singletonList(new SerialGatewaySenderQueue.KeyAndEventPair(1L, event4)))`

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");

Review comment:
       Print statements like this shouldn't be used in tests. If logging is needed for assisting with debugging, either a logger should be added, or a descriptive message added to the `invoke()` method call that is being traced (in this case, in  `WANTestBase.stopSenderInVMsAsync()`).

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);

Review comment:
       See earlier comment about not using `System.out` print statements in tests.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);

Review comment:
       More print statements that should either be removed entirely or replaced with logger output.

##########
File path: geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
##########
@@ -174,7 +174,32 @@
    */
   int GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES =
       Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "get-transaction-events-from-queue-retries",
-          10);
+          2);

Review comment:
       What is the reason for this being changed from 10 to 2?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1797,6 +1817,22 @@ public static String getSenderId(String regionName) {
     return regionName.substring(1, queueStringStart);
   }
 
+  public boolean isThereEventsMatching(GatewaySenderEventImpl event,
+      Predicate<InternalGatewayQueueEvent> condition) {
+    boolean isDREvent = isDREvent(sender.getCache(), event);
+
+    String regionPath = event.getRegionPath();

Review comment:
       See earlier comment about moving this line into the `getRegionPathForEventAndType()` method.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1797,6 +1817,22 @@ public static String getSenderId(String regionName) {
     return regionName.substring(1, queueStringStart);
   }
 
+  public boolean isThereEventsMatching(GatewaySenderEventImpl event,

Review comment:
       This method (and other methods with the same name) might be better named "hasEventsMatching".

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -702,44 +704,15 @@ public boolean put(Object object) throws InterruptedException, CacheException {
     boolean isDREvent = isDREvent(sender.getCache(), value);
 
     String regionPath = value.getRegionPath();
-    if (!isDREvent) {
-      Region region = sender.getCache().getRegion(regionPath, true);
-      regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath();
-    }
-    if (isDebugEnabled) {
-      logger.debug("Put is for the region {}", regionPath);
-    }
-    if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
-      if (isDebugEnabled) {
-        logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToShadowPRMap);
-      }
-      logger.warn(
-          "GatewaySender: Not queuing the event {}, as the region for which this event originated is not yet configured in the GatewaySender",
-          value);
-      // does not put into queue
+    regionPath = getRegionPathForEventAndType(value, isDREvent, regionPath);
+    if (regionPath == null)

Review comment:
       Per the Geode style guide (https://cwiki.apache.org/confluence/display/GEODE/Code+Style+Guide), curly braces should be used for single-line if statements.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -702,44 +704,15 @@ public boolean put(Object object) throws InterruptedException, CacheException {
     boolean isDREvent = isDREvent(sender.getCache(), value);
 
     String regionPath = value.getRegionPath();
-    if (!isDREvent) {
-      Region region = sender.getCache().getRegion(regionPath, true);
-      regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath();
-    }
-    if (isDebugEnabled) {
-      logger.debug("Put is for the region {}", regionPath);
-    }
-    if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
-      if (isDebugEnabled) {
-        logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToShadowPRMap);
-      }
-      logger.warn(
-          "GatewaySender: Not queuing the event {}, as the region for which this event originated is not yet configured in the GatewaySender",
-          value);
-      // does not put into queue
+    regionPath = getRegionPathForEventAndType(value, isDREvent, regionPath);
+    if (regionPath == null)
       return false;
-    }
 
     PartitionedRegion prQ = this.userRegionNameToShadowPRMap.get(regionPath);
     int bucketId = value.getBucketId();
-    Object key = null;
-    if (!isDREvent) {
-      key = value.getShadowKey();
-
-      if ((Long) key == -1) {
-        // In case of parallel we don't expect
-        // the key to be not set. If it is the case then the event must be coming
-        // through listener, so return.
-        if (isDebugEnabled) {
-          logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {}", key,
-              value);
-        }
-        // does not put into queue
-        return false;
-      }
-    } else {
-      key = value.getEventId();
-    }
+    Object key = getKeyForEventAndType(value, isDREvent);
+    if (key == null)

Review comment:
       Another single-line if statement that should have curly braces.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -862,19 +877,35 @@ EventsAndLastKey getElementsMatching(Predicate condition, Predicate stopConditio
         continue;
       }
 
-      if (condition.test(object)) {
-        elementsMatching.add(object);
-        peekedIds.add(currentKey);
-        extraPeekedIds.add(currentKey);
-        lastKey = currentKey;
+      if (condition.test((InternalGatewayQueueEvent) object)) {
+        elementsMatching.add(new KeyAndEventPair(currentKey, (GatewaySenderEventImpl) object));
 
-        if (stopCondition.test(object)) {
+        if (stopCondition.test((InternalGatewayQueueEvent) object)) {
           break;
         }
       }
     }
 
-    return new EventsAndLastKey(elementsMatching, lastKey);
+    return elementsMatching;
+  }
+
+  public boolean isThereEventsMatching(Predicate<InternalGatewayQueueEvent> condition) {
+    InternalGatewayQueueEvent object;

Review comment:
       `object` would be better named `event`.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
##########
@@ -418,10 +424,13 @@ protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
   }
 
   @Override
-  protected void enqueueEvent(GatewayQueueEvent event) {
+  protected boolean enqueueEvent(GatewayQueueEvent event,
+      Predicate<InternalGatewayQueueEvent> condition) {
     for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
-      serialProcessor.enqueueEvent(event);
+      // TODO revisit handling of "condition" when the following enqueueEvent() method is supported:

Review comment:
       Rather than adding a new TODO here, it might be okay to just rely on the exiting TODO in `SerialGatewaySenderEventProcessor.enqueueEvent()`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -870,6 +843,49 @@ public boolean put(Object object) throws InterruptedException, CacheException {
     return putDone;
   }
 
+  private String getRegionPathForEventAndType(GatewaySenderEventImpl event, boolean isDREvent,
+      String regionPath) {
+    boolean isDebugEnabled = logger.isDebugEnabled();
+    if (!isDREvent) {
+      Region region = sender.getCache().getRegion(regionPath, true);

Review comment:
       As the variable `region` is only used once, and always as a `PartitionedRegion`, it can be declared as and cast to a PartitionedRegion on this line, which prevents the "Raw Types" compiler warning here.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after restart: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {

Review comment:
       See earlier comments about not using `Thread.sleep` in tests.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -862,19 +877,35 @@ EventsAndLastKey getElementsMatching(Predicate condition, Predicate stopConditio
         continue;
       }
 
-      if (condition.test(object)) {
-        elementsMatching.add(object);
-        peekedIds.add(currentKey);
-        extraPeekedIds.add(currentKey);
-        lastKey = currentKey;
+      if (condition.test((InternalGatewayQueueEvent) object)) {
+        elementsMatching.add(new KeyAndEventPair(currentKey, (GatewaySenderEventImpl) object));
 
-        if (stopCondition.test(object)) {
+        if (stopCondition.test((InternalGatewayQueueEvent) object)) {
           break;
         }
       }
     }
 
-    return new EventsAndLastKey(elementsMatching, lastKey);
+    return elementsMatching;
+  }
+
+  public boolean isThereEventsMatching(Predicate<InternalGatewayQueueEvent> condition) {

Review comment:
       This method would be better named "hasEventsMatching".

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
##########
@@ -1100,7 +1108,21 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
           // Get substitution value to enqueue if necessary
           Object substituteValue = getSubstituteValue(clonedEvent, operation);
 
-          ev.enqueueEvent(operation, clonedEvent, substituteValue, isLastEventInTransaction);
+          Predicate<InternalGatewayQueueEvent> hasSameTransactionIdPredicate = null;
+          // In case the sender is about to be stopped, the event will only
+          // be queued if there is any event in the queue with the same
+          // transactionId as the one of this event
+          if (isStopping && mustGroupTransactionEvents()
+              && clonedEvent.getTransactionId() != null) {
+            hasSameTransactionIdPredicate =
+                x -> x instanceof GatewaySenderEventImpl && clonedEvent.getTransactionId() != null

Review comment:
       The `instanceof` check here can be safely replaced with a null check, as the predicate is typed such that `x` is an instance of `InternalGatewayQueueEvent`, which is all that is required for the call to `getTransactionId()`.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
##########
@@ -1100,7 +1108,21 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
           // Get substitution value to enqueue if necessary
           Object substituteValue = getSubstituteValue(clonedEvent, operation);
 
-          ev.enqueueEvent(operation, clonedEvent, substituteValue, isLastEventInTransaction);
+          Predicate<InternalGatewayQueueEvent> hasSameTransactionIdPredicate = null;
+          // In case the sender is about to be stopped, the event will only
+          // be queued if there is any event in the queue with the same
+          // transactionId as the one of this event
+          if (isStopping && mustGroupTransactionEvents()
+              && clonedEvent.getTransactionId() != null) {
+            hasSameTransactionIdPredicate =
+                x -> x instanceof GatewaySenderEventImpl && clonedEvent.getTransactionId() != null
+                    && clonedEvent.getTransactionId()
+                        .equals(((GatewaySenderEventImpl) x).getTransactionId());

Review comment:
       The cast to `GatewaySenderEventImpl` here is unnecessary

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -847,7 +855,14 @@ private EventsAndLastKey peekEventsWithTransactionId(TransactionId transactionId
     }
   }
 
-  EventsAndLastKey getElementsMatching(Predicate condition, Predicate stopCondition, long lastKey) {
+  /**
+   * This method returns a list of objects that fulfill the matchingPredicate
+   * If a matching object also fulfills the endPredicate then the method
+   * stops looking for more matching objects.
+   */
+  List<KeyAndEventPair> getElementsMatching(Predicate<InternalGatewayQueueEvent> condition,
+      Predicate<InternalGatewayQueueEvent> stopCondition,
+      long lastKey) {
     Object object;
     List elementsMatching = new ArrayList<>();

Review comment:
       The compiler warnings associated with `elementsMatching` here can be fixed by using `List<KeyAndEventPair>`

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after restart: " + batchesReceived);

Review comment:
       Another print statement.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
##########
@@ -686,6 +689,22 @@ public boolean beforeEnqueue(GatewayQueueEvent gatewayEvent) {
     return enqueue;
   }
 
+  protected void preStop() {
+    if (!mustGroupTransactionEvents() || isStopping) {

Review comment:
       If two calls to `preStop()` are made in succession (for instance if two threads call `stop()` at almost the same time), the second thread will immediately return from `preStop()` and continue with the `stop()` method while the first thread sits and waits. I think that this would bypass the intended behaviour and not cause the shutdown to pause while waiting for incomplete transactions.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -195,6 +207,9 @@ protected void setIsStopped(boolean isStopped) {
     } else {
       this.isStopped = isStopped;
     }
+    if (isStopped) {

Review comment:
       The `setIstStopped()` method can be simplified to:
   ```
     protected void setIsStopped(boolean isStopped) {
       this.isStopped = isStopped;
       if (isStopped) {
         this.failureLogInterval.clear();
         sender.postStop();
       }
     }
   ```

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;

Review comment:
       Is there a reason for this specific value being used for `entries`?

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after restart: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize after restart: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize after restart: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+  }
+
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped()

Review comment:
       As there is a lot of duplicated code between this test and the one above it, all comments applying to that test case likely also apply here.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after restart: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize after restart: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize after restart: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);

Review comment:
       I think these could be replaced with one assert, as described above.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after restart: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {

Review comment:
       See earlier comments about not using `Thread.sleep` in tests.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
##########
@@ -478,6 +480,24 @@ public Object peek() {
     }
   }
 
+  public boolean isThereEventsMatching(Predicate<InternalGatewayQueueEvent> matchingPredicate) {
+    getInitializationLock().readLock().lock();
+    try {
+      if (this.getPartitionedRegion().isDestroyed()) {
+        throw new BucketRegionQueueUnavailableException();
+      }
+      for (Object o : eventSeqNumDeque) {
+        Object object = optimalGet(o);
+        if (matchingPredicate.test((InternalGatewayQueueEvent) object)) {

Review comment:
       I believe there is a possibility of a ClassCastException here. There should be an `instanceof` check on `object` prior to casting.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
##########
@@ -478,6 +480,24 @@ public Object peek() {
     }
   }
 
+  public boolean isThereEventsMatching(Predicate<InternalGatewayQueueEvent> matchingPredicate) {

Review comment:
       This method would be better named "hasEventsMatching".

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;

Review comment:
       I assume that this value is related to `batchSize`, so it would be good to make that explicit rather than just having it a seeminly arbitrary number.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
##########
@@ -2231,6 +2231,9 @@ public String toString() {
     if (this.tailKey != -1) {
       buf.append(";tailKey=" + tailKey);
     }
+    if (this.isTransactional()) {
+      buf.append(";transactionId=" + getTransactionId());

Review comment:
       The compiler warning on this line (and on line 2232) can be fixed by chaining append calls rather than doing String concatenation on the argument.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();

Review comment:
       This would be better as `final Map<Object, Object>` as it's preferable to declare and handle interfaces rather than implementations unless absolutely necessary.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()

Review comment:
       There is a lot of duplicate code in this test and the other added in this class. It would be good to pull as much of it out into its own methods as possible.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;

Review comment:
       I suspect that the values chosen for `transactionsPerCustomer` and `shipmentsPerTransaction` are related to the batch size for this test, so if that's the case, it would be good to make it explicit.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -1320,6 +1320,23 @@ public static void checkGatewayReceiverStats(int processBatches, int eventsRecei
     assertEquals(creates, gatewayReceiverStats.getCreateRequest());
   }
 
+  public static List<Integer> getReceiverStats() {

Review comment:
       It might be more user-friendly to have this method return the actual `GatewayReceiverStats` object, as otherwise, anyone who calls this method will have to come and check which stats correspond to which index in the returned list.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);

Review comment:
       This lambda can be replaced with a metod reference

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =

Review comment:
       These should be declared as `List<Integer>`, which removes the need to cast them here and elsewhere.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {

Review comment:
       Is there a better way to determine that replication has finished? While loops with `Thread.sleep` are notorious for causing flakiness in tests. Perhaps checking/comparing the stats in each VM using `await()`?

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {

Review comment:
       See earlier comment about avoiding using `Thread.sleep` in tests.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {

Review comment:
       See earlier comment about avoiding using `Thread.sleep` in tests.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+
+    System.out.println("Starting sender");

Review comment:
       Another print statement.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);

Review comment:
       Could these two asserts be replaced with one, asserting that `shipmentRegionSize` is equal to `10 * orderRegionSize`?

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {

Review comment:
       See earlier comments about not using `Thread.sleep` in tests.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()

Review comment:
       There is a lot of duplicated code between this test and the other added to this class. It should be pulled out into methods to avoid duplication.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after stop: " + batchesReceived);
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize: " + shipmentRegionSize);
+    System.out.println("shipmentRegionSize: " + shipmentRegionSize);
+
+    System.out.println("v4List.get(0): " + v4List.get(0));
+    System.out.println("v5List.get(0): " + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    assertEquals(10, shipmentRegionSize / orderRegionSize);
+    assertEquals(0, shipmentRegionSize % orderRegionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived after restart: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+      if (orderRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+    while (true) {
+      Thread.sleep(1000);
+      int newRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName));
+      if (shipmentRegionSize == newRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("orderRegionSize after restart: " + shipmentRegionSize);

Review comment:
       More print statements.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");

Review comment:
       Another print statement that should be removed. In this case, a description can be added as the first argument to the `invokeAsync()` call below to help with debugging.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");

Review comment:
       It would be helpful to add a comment explaining why these exceptions can be ignored here.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");

Review comment:
       Another print statement that can be moved to being the first argument of the `invoke()` call.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");

Review comment:
       More print statements.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {

Review comment:
       See comments about not using `Thread.sleep` in tests.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    // Only complete transactions (11 entries each) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    System.out.println("regionSize: " + regionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {

Review comment:
       See comments about not using `Thread.sleep` in tests.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));

Review comment:
       More print statements.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {

Review comment:
       See comments about not using `Thread.sleep` in tests.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    // Only complete transactions (11 entries each) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);

Review comment:
       This assertion is not strictly checking what the comment above it is describing. It's possible that the region size could be a multiple of `eventsPerTransaction` but still contain incomplete transactions. Given that we check that the incomplete transactions stat is 0 immediately after, this assertion can probably be removed.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    // Only complete transactions (11 entries each) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    System.out.println("regionSize: " + regionSize);

Review comment:
       More print statements.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    // Only complete transactions (11 entries each) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    System.out.println("regionSize: " + regionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {

Review comment:
       See comments about not using `Thread.sleep` in tests.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    // Only complete transactions (11 entries each) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    System.out.println("regionSize: " + regionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived: " + batchesReceived);

Review comment:
       Another print statement.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =

Review comment:
       These should be declared as `List<Integer>`, which removes the need for casting them here and elsewhere.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    // Only complete transactions (11 entries each) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    System.out.println("regionSize: " + regionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("regionSize: " + regionSize);

Review comment:
       More print statements.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    // Only complete transactions (11 entries each) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    System.out.println("regionSize: " + regionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("regionSize: " + regionSize);
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (11 events) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);

Review comment:
       See above comment about this assertion not necessarily checking what is being described in the comment.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,276 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, true,
+            groupTransactionEvents));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    int entries = 2200;
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    int eventsPerTransaction = 11;
+    System.out.println("Starting puts");
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+                eventsPerTransaction));
+
+    // wait for batches to be distributed and then stop the sender
+    System.out.println("Waiting for some batches to be distributed");
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+    System.out
+        .println("Some batches distributed: " + vm4.invoke(() -> getSenderStats("ln", -1).get(4)));
+
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    System.out.println("Puts completed");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    // Only complete transactions (11 entries each) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    System.out.println("regionSize: " + regionSize);
+
+    System.out.println("Starting sender");
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {
+      int oldBatchesReceived = batchesReceived;
+      Thread.sleep(1000);
+      batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+      if (batchesReceived == oldBatchesReceived) {
+        break;
+      }
+    }
+
+    System.out.println("batchesReceived: " + batchesReceived);
+
+    // Wait for events to replicate: when region size does not change
+    // we can assume that replication has finished.
+    regionSize = vm2.invoke(() -> getRegionSize(regionName));
+    while (true) {
+      int oldRegionSize = regionSize;
+      Thread.sleep(1000);
+      regionSize = vm2.invoke(() -> getRegionSize(regionName));
+      if (regionSize == oldRegionSize) {
+        break;
+      }
+    }
+
+    System.out.println("regionSize: " + regionSize);
+
+    System.out.println("v4List.get(0):" + v4List.get(0));
+    System.out.println("v5List.get(0):" + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Only complete transactions (11 events) must be replicated
+    assertEquals(0, regionSize % eventsPerTransaction);
+  }
+
+  @Test
+  public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped()

Review comment:
       Given that there is a large amount of duplicat code between this test case and the one above it, all comments that apply to the first test case also likely apply to this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org