You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mk...@apache.org on 2020/12/14 09:57:34 UTC

[geode] branch develop updated: GEODE-8765: Fix NullPointerException when group-transaction-events an… (#5829)

This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4c00984  GEODE-8765: Fix NullPointerException when group-transaction-events an… (#5829)
4c00984 is described below

commit 4c00984eb3c32972b4b03fdbe2b6397decdc177f
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Mon Dec 14 10:56:31 2020 +0100

    GEODE-8765: Fix NullPointerException when group-transaction-events an… (#5829)
    
    * GEODE-8765: Fix NullPointerException when group-transaction-events and events in and not in transactions are sent.
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   |  60 +++++---
 .../cache/wan/serial/SerialGatewaySenderQueue.java |  58 ++++----
 .../internal/cache/BucketRegionQueueJUnitTest.java |  41 +++---
 .../parallel/ParallelWANPropagationDUnitTest.java  | 137 ++++++++++++++++--
 ...lWANPropagation_PartitionedRegionDUnitTest.java | 160 ++++++++++++++++++---
 5 files changed, 351 insertions(+), 105 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 18c1624..2100d54 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1260,7 +1260,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     addPeekedEvents(batch, batchSize == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : batchSize);
 
     int bId;
-    Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>();
     while (batchSize == BATCH_BASED_ON_TIME_ONLY || batch.size() < batchSize) {
       if (areLocalBucketQueueRegionsPresent() && ((bId = getRandomPrimaryBucket(prQ)) != -1)) {
         GatewaySenderEventImpl object = (GatewaySenderEventImpl) peekAhead(prQ, bId);
@@ -1280,13 +1279,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             logger.debug("The gatewayEventImpl in peek is {}", object);
           }
           batch.add(object);
-          if (object.getTransactionId() != null) {
-            if (object.isLastEventInTransaction()) {
-              incompleteTransactionsInBatch.remove(object.getTransactionId());
-            } else {
-              incompleteTransactionsInBatch.put(object.getTransactionId(), bId);
-            }
-          }
           peekedEvents.add(object);
 
         } else {
@@ -1316,7 +1308,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     }
 
     if (batch.size() > 0) {
-      peekEventsFromIncompleteTransactions(batch, incompleteTransactionsInBatch, prQ);
+      peekEventsFromIncompleteTransactions(batch, prQ);
     }
 
     if (isDebugEnabled) {
@@ -1351,12 +1343,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   }
 
   private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
-      Map<TransactionId, Integer> incompleteTransactionIdsInBatch, PartitionedRegion prQ) {
+      PartitionedRegion prQ) {
     if (!mustGroupTransactionEvents()) {
       return;
     }
 
-    if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) {
+    Map<TransactionId, Integer> incompleteTransactionIdsInBatch =
+        getIncompleteTransactionsInBatch(batch);
+    if (incompleteTransactionIdsInBatch.size() == 0) {
       return;
     }
 
@@ -1389,8 +1383,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     }
   }
 
-  private boolean areAllTransactionsCompleteInBatch(Map incompleteTransactions) {
-    return (incompleteTransactions.size() == 0);
+  private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(
+      List<GatewaySenderEventImpl> batch) {
+    Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>();
+    for (GatewaySenderEventImpl event : batch) {
+      if (event.getTransactionId() != null) {
+        if (event.isLastEventInTransaction()) {
+          incompleteTransactionsInBatch.remove(event.getTransactionId());
+        } else {
+          incompleteTransactionsInBatch.put(event.getTransactionId(), event.getBucketId());
+        }
+      }
+    }
+    return incompleteTransactionsInBatch;
   }
 
   @VisibleForTesting
@@ -1472,19 +1477,18 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     for (int i = 0; i < batchSize || incompleteTransactionsInBatch.size() != 0; i++) {
       GatewaySenderEventImpl event = this.peekedEventsProcessing.remove();
       batch.add(event);
-      if (event.getTransactionId() != null) {
-        if (event.isLastEventInTransaction()) {
-          incompleteTransactionsInBatch.remove(event.getTransactionId());
-        } else {
-          incompleteTransactionsInBatch.add(event.getTransactionId());
+      if (mustGroupTransactionEvents()) {
+        if (event.getTransactionId() != null) {
+          if (event.isLastEventInTransaction()) {
+            incompleteTransactionsInBatch.remove(event.getTransactionId());
+          } else {
+            incompleteTransactionsInBatch.add(event.getTransactionId());
+          }
         }
       }
       if (this.peekedEventsProcessing.isEmpty()) {
         this.resetLastPeeked = false;
         this.peekedEventsProcessingInProgress = false;
-        if (incompleteTransactionsInBatch.size() != 0) {
-          logger.error("A batch with incomplete transactions has been sent.");
-        }
         break;
       }
     }
@@ -1547,9 +1551,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
     try {
       Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
-          x -> x.getTransactionId().equals(transactionId);
+          getHasTransactionIdPredicate(transactionId);
       Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
-          x -> x.isLastEventInTransaction();
+          getIsLastEventInTransactionPredicate();
       objects =
           brq.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate);
     } catch (BucketRegionQueueUnavailableException e) {
@@ -1561,6 +1565,16 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     // finished with peeked objects.
   }
 
+  @VisibleForTesting
+  public static Predicate<GatewaySenderEventImpl> getIsLastEventInTransactionPredicate() {
+    return x -> x.isLastEventInTransaction();
+  }
+
+  @VisibleForTesting
+  public static Predicate<GatewaySenderEventImpl> getHasTransactionIdPredicate(
+      TransactionId transactionId) {
+    return x -> transactionId.equals(x.getTransactionId());
+  }
 
   protected BucketRegionQueue getBucketRegionQueueByBucketId(final PartitionedRegion prQ,
       final int bucketId) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 6b57fa0..192af19 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -413,12 +413,12 @@ public class SerialGatewaySenderQueue implements RegionQueue {
   }
 
   @Override
-  public List<AsyncEvent> peek(int size) throws CacheException {
+  public List<AsyncEvent<?, ?>> peek(int size) throws CacheException {
     return peek(size, -1);
   }
 
   @Override
-  public List<AsyncEvent> peek(int size, int timeToWait) throws CacheException {
+  public List<AsyncEvent<?, ?>> peek(int size, int timeToWait) throws CacheException {
     final boolean isTraceEnabled = logger.isTraceEnabled();
 
     long start = System.currentTimeMillis();
@@ -428,27 +428,16 @@ public class SerialGatewaySenderQueue implements RegionQueue {
           timeToWait);
     }
 
-    List<AsyncEvent> batch =
-        new ArrayList<AsyncEvent>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size);
-    Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>();
+    List<AsyncEvent<?, ?>> batch =
+        new ArrayList<>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size);
     long lastKey = -1;
     while (size == BATCH_BASED_ON_TIME_ONLY || batch.size() < size) {
       KeyAndEventPair pair = peekAhead();
       // Conflate here
       if (pair != null) {
-        AsyncEvent object = pair.event;
+        AsyncEvent<?, ?> object = pair.event;
         lastKey = pair.key;
         batch.add(object);
-        if (object instanceof GatewaySenderEventImpl) {
-          GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
-          if (event.getTransactionId() != null) {
-            if (event.isLastEventInTransaction()) {
-              incompleteTransactionsInBatch.remove(event.getTransactionId());
-            } else {
-              incompleteTransactionsInBatch.add(event.getTransactionId());
-            }
-          }
-        }
       } else {
         // If time to wait is -1 (don't wait) or time interval has elapsed
         long currentTime = System.currentTimeMillis();
@@ -476,7 +465,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
       }
     }
     if (batch.size() > 0) {
-      peekEventsFromIncompleteTransactions(batch, incompleteTransactionsInBatch, lastKey);
+      peekEventsFromIncompleteTransactions(batch, lastKey);
     }
 
     if (isTraceEnabled) {
@@ -487,13 +476,13 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     // so no need to worry about off-heap refCount.
   }
 
-  private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch,
-      Set<TransactionId> incompleteTransactionIdsInBatch, long lastKey) {
+  private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) {
     if (!mustGroupTransactionEvents()) {
       return;
     }
 
-    if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) {
+    Set<TransactionId> incompleteTransactionIdsInBatch = getIncompleteTransactionsInBatch(batch);
+    if (incompleteTransactionIdsInBatch.size() == 0) {
       return;
     }
 
@@ -530,8 +519,21 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     return sender.mustGroupTransactionEvents();
   }
 
-  private boolean areAllTransactionsCompleteInBatch(Set incompleteTransactions) {
-    return (incompleteTransactions.size() == 0);
+  private Set<TransactionId> getIncompleteTransactionsInBatch(List<AsyncEvent<?, ?>> batch) {
+    Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>();
+    for (Object object : batch) {
+      if (object instanceof GatewaySenderEventImpl) {
+        GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
+        if (event.getTransactionId() != null) {
+          if (event.isLastEventInTransaction()) {
+            incompleteTransactionsInBatch.remove(event.getTransactionId());
+          } else {
+            incompleteTransactionsInBatch.add(event.getTransactionId());
+          }
+        }
+      }
+    }
+    return incompleteTransactionsInBatch;
   }
 
   @Override
@@ -557,9 +559,9 @@ public class SerialGatewaySenderQueue implements RegionQueue {
   public void removeCacheListener() {
     AttributesMutator mutator = this.region.getAttributesMutator();
     CacheListener[] listeners = this.region.getAttributes().getCacheListeners();
-    for (int i = 0; i < listeners.length; i++) {
-      if (listeners[i] instanceof SerialSecondaryGatewayListener) {
-        mutator.removeCacheListener(listeners[i]);
+    for (CacheListener listener : listeners) {
+      if (listener instanceof SerialSecondaryGatewayListener) {
+        mutator.removeCacheListener(listener);
         break;
       }
     }
@@ -591,7 +593,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
       try {
         Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
         if (latestIndexesForRegion == null) {
-          latestIndexesForRegion = new HashMap<Object, Long>();
+          latestIndexesForRegion = new HashMap<>();
           this.indexes.put(rName, latestIndexesForRegion);
         }
 
@@ -664,7 +666,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     Object o = null;
     try {
       o = lr.getValueInVMOrDiskWithoutFaultIn(k);
-      if (o != null && o instanceof CachedDeserializable) {
+      if (o instanceof CachedDeserializable) {
         o = ((CachedDeserializable) o).getDeserializedValue(lr, lr.getRegionEntry(k));
       }
     } catch (EntryNotFoundException ok) {
@@ -842,7 +844,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
 
   private EventsAndLastKey peekEventsWithTransactionId(TransactionId transactionId, long lastKey) {
     Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
-        x -> x.getTransactionId().equals(transactionId);
+        x -> transactionId.equals(x.getTransactionId());
     Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
         x -> x.isLastEventInTransaction();
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
index 15906b1..4279832 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
@@ -37,8 +37,8 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
+import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
 import org.apache.geode.internal.statistics.DummyStatisticsFactory;
 import org.apache.geode.test.fake.Fakes;
 
@@ -138,38 +138,39 @@ public class BucketRegionQueueJUnitTest {
   }
 
   @Test
-  public void testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventInTransactionPredicate()
+  public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSomeEventsNotInTransactions()
       throws ForceReattemptException {
-    ParallelGatewaySenderEventProcessor processor =
-        ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+    ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
 
     TransactionId tx1 = new TXId(null, 1);
     TransactionId tx2 = new TXId(null, 2);
     TransactionId tx3 = new TXId(null, 3);
 
     GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false);
-    GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(2, tx2, false);
-    GatewaySenderEventImpl event3 = createMockGatewaySenderEvent(3, tx1, true);
-    GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(4, tx2, true);
-    GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(5, tx3, false);
-    GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(6, tx3, false);
-    GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(7, tx1, true);
+    GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(2, null, false);
+    GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(3, tx2, false);
+    GatewaySenderEventImpl event3 = createMockGatewaySenderEvent(4, tx1, true);
+    GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(5, tx2, true);
+    GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(6, tx3, false);
+    GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(7, tx3, false);
+    GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(8, tx1, true);
 
     this.bucketRegionQueue
         .cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);
 
-    this.bucketRegionQueue.addToQueue(Long.valueOf(1), event1);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(2), event2);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(3), event3);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(4), event4);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(5), event5);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(6), event6);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(7), event7);
+    this.bucketRegionQueue.addToQueue(1L, event1);
+    this.bucketRegionQueue.addToQueue(2L, eventNotInTransaction1);
+    this.bucketRegionQueue.addToQueue(3L, event2);
+    this.bucketRegionQueue.addToQueue(4L, event3);
+    this.bucketRegionQueue.addToQueue(5L, event4);
+    this.bucketRegionQueue.addToQueue(6L, event5);
+    this.bucketRegionQueue.addToQueue(7L, event6);
+    this.bucketRegionQueue.addToQueue(8L, event7);
 
     Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
-        x -> x.getTransactionId().equals(tx1);
+        ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1);
     Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
-        x -> x.isLastEventInTransaction();
+        ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate();
     List<Object> objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
         isLastEventInTransactionPredicate);
 
@@ -182,7 +183,7 @@ public class BucketRegionQueueJUnitTest {
     assertEquals(objects, Arrays.asList(new Object[] {event7}));
 
     hasTransactionIdPredicate =
-        x -> x.getTransactionId().equals(tx2);
+        ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2);
     objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
         isLastEventInTransactionPredicate);
     assertEquals(2, objects.size());
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
index 7423dbc..67ef206 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.internal.cache.wan.parallel;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 import junitparams.JUnitParamsRunner;
@@ -33,6 +35,11 @@ import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.execute.data.CustId;
+import org.apache.geode.internal.cache.execute.data.Order;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.execute.data.Shipment;
+import org.apache.geode.internal.cache.execute.data.ShipmentId;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.BatchException70;
 import org.apache.geode.internal.cache.wan.WANTestBase;
@@ -55,7 +62,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
   @Test
   public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() {
     Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     createCache(lnPort);
     createSender("ln", 2, true, 100, 300, false, false, null, true);
@@ -77,10 +84,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     }
 
     GemFireCacheImpl gemCache = (GemFireCacheImpl) cache;
-    Set regionSet = gemCache.rootRegions();
+    Set<?> regionSet = gemCache.rootRegions();
 
     for (Object r : regionSet) {
-      if (((Region) r).getName()
+      if (((Region<?, ?>) r).getName()
           .equals(((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]
               .getRegion().getName())) {
         fail("The shadowPR is exposed to the user");
@@ -683,18 +690,18 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
 
-    AsyncInvocation inv1 =
+    AsyncInvocation<Void> inv1 =
         vm7.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 5000));
     Wait.pause(500);
-    AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
-    AsyncInvocation inv3 =
+    AsyncInvocation<Void> inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+    AsyncInvocation<Void> inv3 =
         vm6.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 10000));
     Wait.pause(1500);
-    AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
-    inv1.join();
-    inv2.join();
-    inv3.join();
-    inv4.join();
+    AsyncInvocation<Void> inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+    inv1.await();
+    inv2.await();
+    inv3.await();
+    inv4.await();
 
     vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000));
     vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000));
@@ -1047,7 +1054,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "PARENT_PR", null, 0,
         100, isOffHeap(), shortcut));
     String parentRegionFullPath =
-        (String) vm3.invoke(() -> WANTestBase.getRegionFullPath(getTestMethodName() + "PARENT_PR"));
+        vm3.invoke(() -> WANTestBase.getRegionFullPath(getTestMethodName() + "PARENT_PR"));
 
     // create colocated (child) PR on site1
     vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegion(getTestMethodName() + "CHILD_PR",
@@ -1235,6 +1242,112 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+    vm6.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+    vm7.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map<Object, Object> keyValuesInTransactions = new HashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+        ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+        ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+        keyValuesInTransactions.put(orderId, new Order());
+        keyValuesInTransactions.put(shipmentId1, new Shipment());
+        keyValuesInTransactions.put(shipmentId2, new Shipment());
+        keyValuesInTransactions.put(shipmentId3, new Shipment());
+      }
+    }
+
+    int ordersPerCustomerNotInTransactions = 1000;
+
+    final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject);
+        keyValuesNotInTransactions.put(orderId, new Order());
+      }
+    }
+
+    // eventsPerTransaction is 1 (orders) + 3 (shipments)
+    int eventsPerTransaction = 4;
+    AsyncInvocation<Void> inv1 =
+        vm7.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    AsyncInvocation<Void> inv2 =
+        vm6.invokeAsync(
+            () -> WANTestBase.putGivenKeyValue(orderRegionName, keyValuesNotInTransactions));
+
+    inv1.await();
+    inv2.await();
+
+    int entries =
+        ordersPerCustomerNotInTransactions * customers + transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm6.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm7.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+    vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+    vm6.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+    vm7.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+  }
 
   private static RegionShortcut[] getRegionShortcuts() {
     return new RegionShortcut[] {RegionShortcut.PARTITION, RegionShortcut.PARTITION_PERSISTENT};
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
index 59b5b7a..5604a96 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
@@ -14,12 +14,20 @@
  */
 package org.apache.geode.internal.cache.wan.serial;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.execute.data.CustId;
+import org.apache.geode.internal.cache.execute.data.Order;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.execute.data.Shipment;
+import org.apache.geode.internal.cache.execute.data.ShipmentId;
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
@@ -35,10 +43,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
   }
 
   @Test
-  public void testPartitionedSerialPropagation() throws Exception {
+  public void testPartitionedSerialPropagation() {
 
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -71,10 +79,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
   }
 
   @Test
-  public void testBothReplicatedAndPartitionedSerialPropagation() throws Exception {
+  public void testBothReplicatedAndPartitionedSerialPropagation() {
 
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -122,10 +130,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
   }
 
   @Test
-  public void testSerialReplicatedAndPartitionedPropagation() throws Exception {
+  public void testSerialReplicatedAndPartitionedPropagation() {
 
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -176,10 +184,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
   }
 
   @Test
-  public void testSerialReplicatedAndSerialPartitionedPropagation() throws Exception {
+  public void testSerialReplicatedAndSerialPartitionedPropagation() {
 
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -236,11 +244,11 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
   }
 
   @Test
-  public void testPartitionedSerialPropagationToTwoWanSites() throws Exception {
+  public void testPartitionedSerialPropagationToTwoWanSites() {
 
     Integer lnPort = createFirstLocatorWithDSId(1);
-    Integer nyPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-    Integer tkPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort));
+    Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer tkPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort));
 
     createCacheInVMs(nyPort, vm2);
     vm2.invoke(() -> WANTestBase.createReceiver());
@@ -288,8 +296,8 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
     IgnoredException.addIgnoredException("Connection reset");
     IgnoredException.addIgnoredException("Unexpected IOException");
 
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -322,12 +330,12 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
     IgnoredException.addIgnoredException(CacheClosedException.class.getName());
     IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
     // start async puts
-    AsyncInvocation inv =
+    AsyncInvocation<Void> inv =
         vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000));
     // close the cache on vm4 in between the puts
     vm4.invoke(() -> WANTestBase.killSender());
 
-    inv.join();
+    inv.await();
     vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
     vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
@@ -335,10 +343,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
   }
 
   @Test
-  public void testPartitionedSerialPropagationWithParallelThreads() throws Exception {
+  public void testPartitionedSerialPropagationWithParallelThreads() {
 
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -371,4 +379,112 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
     vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
   }
+
+  @Test
+  public void testPartitionedSerialPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1));
+    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(1));
+    vm6.invoke(() -> setNumDispatcherThreadsForTheRun(1));
+    vm7.invoke(() -> setNumDispatcherThreadsForTheRun(1));
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true));
+
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map<Object, Object> keyValuesInTransactions = new HashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+        ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+        ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+        keyValuesInTransactions.put(orderId, new Order());
+        keyValuesInTransactions.put(shipmentId1, new Shipment());
+        keyValuesInTransactions.put(shipmentId2, new Shipment());
+        keyValuesInTransactions.put(shipmentId3, new Shipment());
+      }
+    }
+
+    int ordersPerCustomerNotInTransactions = 1000;
+
+    final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject);
+        keyValuesNotInTransactions.put(orderId, new Order());
+      }
+    }
+
+    // eventsPerTransaction is 1 (orders) + 3 (shipments)
+    int eventsPerTransaction = 4;
+    AsyncInvocation<Void> inv1 =
+        vm7.invokeAsync(
+            () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    AsyncInvocation<Void> inv2 =
+        vm6.invokeAsync(
+            () -> WANTestBase.putGivenKeyValue(orderRegionName, keyValuesNotInTransactions));
+
+    inv1.await();
+    inv2.await();
+
+    int entries =
+        ordersPerCustomerNotInTransactions * customers + transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm6.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm7.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+    vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+    vm6.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+    vm7.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+  }
 }