You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mk...@apache.org on 2020/12/14 09:57:34 UTC
[geode] branch develop updated: GEODE-8765: Fix NullPointerException when group-transaction-events an… (#5829)
This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 4c00984 GEODE-8765: Fix NullPointerException when group-transaction-events an… (#5829)
4c00984 is described below
commit 4c00984eb3c32972b4b03fdbe2b6397decdc177f
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Mon Dec 14 10:56:31 2020 +0100
GEODE-8765: Fix NullPointerException when group-transaction-events an… (#5829)
* GEODE-8765: Fix NullPointerException when group-transaction-events and events in and not in transactions are sent.
---
.../wan/parallel/ParallelGatewaySenderQueue.java | 60 +++++---
.../cache/wan/serial/SerialGatewaySenderQueue.java | 58 ++++----
.../internal/cache/BucketRegionQueueJUnitTest.java | 41 +++---
.../parallel/ParallelWANPropagationDUnitTest.java | 137 ++++++++++++++++--
...lWANPropagation_PartitionedRegionDUnitTest.java | 160 ++++++++++++++++++---
5 files changed, 351 insertions(+), 105 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 18c1624..2100d54 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1260,7 +1260,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
addPeekedEvents(batch, batchSize == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : batchSize);
int bId;
- Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>();
while (batchSize == BATCH_BASED_ON_TIME_ONLY || batch.size() < batchSize) {
if (areLocalBucketQueueRegionsPresent() && ((bId = getRandomPrimaryBucket(prQ)) != -1)) {
GatewaySenderEventImpl object = (GatewaySenderEventImpl) peekAhead(prQ, bId);
@@ -1280,13 +1279,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
logger.debug("The gatewayEventImpl in peek is {}", object);
}
batch.add(object);
- if (object.getTransactionId() != null) {
- if (object.isLastEventInTransaction()) {
- incompleteTransactionsInBatch.remove(object.getTransactionId());
- } else {
- incompleteTransactionsInBatch.put(object.getTransactionId(), bId);
- }
- }
peekedEvents.add(object);
} else {
@@ -1316,7 +1308,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
if (batch.size() > 0) {
- peekEventsFromIncompleteTransactions(batch, incompleteTransactionsInBatch, prQ);
+ peekEventsFromIncompleteTransactions(batch, prQ);
}
if (isDebugEnabled) {
@@ -1351,12 +1343,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
- Map<TransactionId, Integer> incompleteTransactionIdsInBatch, PartitionedRegion prQ) {
+ PartitionedRegion prQ) {
if (!mustGroupTransactionEvents()) {
return;
}
- if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) {
+ Map<TransactionId, Integer> incompleteTransactionIdsInBatch =
+ getIncompleteTransactionsInBatch(batch);
+ if (incompleteTransactionIdsInBatch.size() == 0) {
return;
}
@@ -1389,8 +1383,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
}
- private boolean areAllTransactionsCompleteInBatch(Map incompleteTransactions) {
- return (incompleteTransactions.size() == 0);
+ private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(
+ List<GatewaySenderEventImpl> batch) {
+ Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>();
+ for (GatewaySenderEventImpl event : batch) {
+ if (event.getTransactionId() != null) {
+ if (event.isLastEventInTransaction()) {
+ incompleteTransactionsInBatch.remove(event.getTransactionId());
+ } else {
+ incompleteTransactionsInBatch.put(event.getTransactionId(), event.getBucketId());
+ }
+ }
+ }
+ return incompleteTransactionsInBatch;
}
@VisibleForTesting
@@ -1472,19 +1477,18 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
for (int i = 0; i < batchSize || incompleteTransactionsInBatch.size() != 0; i++) {
GatewaySenderEventImpl event = this.peekedEventsProcessing.remove();
batch.add(event);
- if (event.getTransactionId() != null) {
- if (event.isLastEventInTransaction()) {
- incompleteTransactionsInBatch.remove(event.getTransactionId());
- } else {
- incompleteTransactionsInBatch.add(event.getTransactionId());
+ if (mustGroupTransactionEvents()) {
+ if (event.getTransactionId() != null) {
+ if (event.isLastEventInTransaction()) {
+ incompleteTransactionsInBatch.remove(event.getTransactionId());
+ } else {
+ incompleteTransactionsInBatch.add(event.getTransactionId());
+ }
}
}
if (this.peekedEventsProcessing.isEmpty()) {
this.resetLastPeeked = false;
this.peekedEventsProcessingInProgress = false;
- if (incompleteTransactionsInBatch.size() != 0) {
- logger.error("A batch with incomplete transactions has been sent.");
- }
break;
}
}
@@ -1547,9 +1551,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
try {
Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
- x -> x.getTransactionId().equals(transactionId);
+ getHasTransactionIdPredicate(transactionId);
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
- x -> x.isLastEventInTransaction();
+ getIsLastEventInTransactionPredicate();
objects =
brq.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate);
} catch (BucketRegionQueueUnavailableException e) {
@@ -1561,6 +1565,16 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// finished with peeked objects.
}
+ @VisibleForTesting
+ public static Predicate<GatewaySenderEventImpl> getIsLastEventInTransactionPredicate() {
+ return x -> x.isLastEventInTransaction();
+ }
+
+ @VisibleForTesting
+ public static Predicate<GatewaySenderEventImpl> getHasTransactionIdPredicate(
+ TransactionId transactionId) {
+ return x -> transactionId.equals(x.getTransactionId());
+ }
protected BucketRegionQueue getBucketRegionQueueByBucketId(final PartitionedRegion prQ,
final int bucketId) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 6b57fa0..192af19 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -413,12 +413,12 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
@Override
- public List<AsyncEvent> peek(int size) throws CacheException {
+ public List<AsyncEvent<?, ?>> peek(int size) throws CacheException {
return peek(size, -1);
}
@Override
- public List<AsyncEvent> peek(int size, int timeToWait) throws CacheException {
+ public List<AsyncEvent<?, ?>> peek(int size, int timeToWait) throws CacheException {
final boolean isTraceEnabled = logger.isTraceEnabled();
long start = System.currentTimeMillis();
@@ -428,27 +428,16 @@ public class SerialGatewaySenderQueue implements RegionQueue {
timeToWait);
}
- List<AsyncEvent> batch =
- new ArrayList<AsyncEvent>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size);
- Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>();
+ List<AsyncEvent<?, ?>> batch =
+ new ArrayList<>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size);
long lastKey = -1;
while (size == BATCH_BASED_ON_TIME_ONLY || batch.size() < size) {
KeyAndEventPair pair = peekAhead();
// Conflate here
if (pair != null) {
- AsyncEvent object = pair.event;
+ AsyncEvent<?, ?> object = pair.event;
lastKey = pair.key;
batch.add(object);
- if (object instanceof GatewaySenderEventImpl) {
- GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
- if (event.getTransactionId() != null) {
- if (event.isLastEventInTransaction()) {
- incompleteTransactionsInBatch.remove(event.getTransactionId());
- } else {
- incompleteTransactionsInBatch.add(event.getTransactionId());
- }
- }
- }
} else {
// If time to wait is -1 (don't wait) or time interval has elapsed
long currentTime = System.currentTimeMillis();
@@ -476,7 +465,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
}
if (batch.size() > 0) {
- peekEventsFromIncompleteTransactions(batch, incompleteTransactionsInBatch, lastKey);
+ peekEventsFromIncompleteTransactions(batch, lastKey);
}
if (isTraceEnabled) {
@@ -487,13 +476,13 @@ public class SerialGatewaySenderQueue implements RegionQueue {
// so no need to worry about off-heap refCount.
}
- private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch,
- Set<TransactionId> incompleteTransactionIdsInBatch, long lastKey) {
+ private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) {
if (!mustGroupTransactionEvents()) {
return;
}
- if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) {
+ Set<TransactionId> incompleteTransactionIdsInBatch = getIncompleteTransactionsInBatch(batch);
+ if (incompleteTransactionIdsInBatch.size() == 0) {
return;
}
@@ -530,8 +519,21 @@ public class SerialGatewaySenderQueue implements RegionQueue {
return sender.mustGroupTransactionEvents();
}
- private boolean areAllTransactionsCompleteInBatch(Set incompleteTransactions) {
- return (incompleteTransactions.size() == 0);
+ private Set<TransactionId> getIncompleteTransactionsInBatch(List<AsyncEvent<?, ?>> batch) {
+ Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>();
+ for (Object object : batch) {
+ if (object instanceof GatewaySenderEventImpl) {
+ GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
+ if (event.getTransactionId() != null) {
+ if (event.isLastEventInTransaction()) {
+ incompleteTransactionsInBatch.remove(event.getTransactionId());
+ } else {
+ incompleteTransactionsInBatch.add(event.getTransactionId());
+ }
+ }
+ }
+ }
+ return incompleteTransactionsInBatch;
}
@Override
@@ -557,9 +559,9 @@ public class SerialGatewaySenderQueue implements RegionQueue {
public void removeCacheListener() {
AttributesMutator mutator = this.region.getAttributesMutator();
CacheListener[] listeners = this.region.getAttributes().getCacheListeners();
- for (int i = 0; i < listeners.length; i++) {
- if (listeners[i] instanceof SerialSecondaryGatewayListener) {
- mutator.removeCacheListener(listeners[i]);
+ for (CacheListener listener : listeners) {
+ if (listener instanceof SerialSecondaryGatewayListener) {
+ mutator.removeCacheListener(listener);
break;
}
}
@@ -591,7 +593,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
try {
Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
if (latestIndexesForRegion == null) {
- latestIndexesForRegion = new HashMap<Object, Long>();
+ latestIndexesForRegion = new HashMap<>();
this.indexes.put(rName, latestIndexesForRegion);
}
@@ -664,7 +666,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
Object o = null;
try {
o = lr.getValueInVMOrDiskWithoutFaultIn(k);
- if (o != null && o instanceof CachedDeserializable) {
+ if (o instanceof CachedDeserializable) {
o = ((CachedDeserializable) o).getDeserializedValue(lr, lr.getRegionEntry(k));
}
} catch (EntryNotFoundException ok) {
@@ -842,7 +844,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
private EventsAndLastKey peekEventsWithTransactionId(TransactionId transactionId, long lastKey) {
Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
- x -> x.getTransactionId().equals(transactionId);
+ x -> transactionId.equals(x.getTransactionId());
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
x -> x.isLastEventInTransaction();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
index 15906b1..4279832 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
@@ -37,8 +37,8 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
+import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.geode.test.fake.Fakes;
@@ -138,38 +138,39 @@ public class BucketRegionQueueJUnitTest {
}
@Test
- public void testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventInTransactionPredicate()
+ public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSomeEventsNotInTransactions()
throws ForceReattemptException {
- ParallelGatewaySenderEventProcessor processor =
- ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+ ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
TransactionId tx1 = new TXId(null, 1);
TransactionId tx2 = new TXId(null, 2);
TransactionId tx3 = new TXId(null, 3);
GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false);
- GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(2, tx2, false);
- GatewaySenderEventImpl event3 = createMockGatewaySenderEvent(3, tx1, true);
- GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(4, tx2, true);
- GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(5, tx3, false);
- GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(6, tx3, false);
- GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(7, tx1, true);
+ GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(2, null, false);
+ GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(3, tx2, false);
+ GatewaySenderEventImpl event3 = createMockGatewaySenderEvent(4, tx1, true);
+ GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(5, tx2, true);
+ GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(6, tx3, false);
+ GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(7, tx3, false);
+ GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(8, tx1, true);
this.bucketRegionQueue
.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);
- this.bucketRegionQueue.addToQueue(Long.valueOf(1), event1);
- this.bucketRegionQueue.addToQueue(Long.valueOf(2), event2);
- this.bucketRegionQueue.addToQueue(Long.valueOf(3), event3);
- this.bucketRegionQueue.addToQueue(Long.valueOf(4), event4);
- this.bucketRegionQueue.addToQueue(Long.valueOf(5), event5);
- this.bucketRegionQueue.addToQueue(Long.valueOf(6), event6);
- this.bucketRegionQueue.addToQueue(Long.valueOf(7), event7);
+ this.bucketRegionQueue.addToQueue(1L, event1);
+ this.bucketRegionQueue.addToQueue(2L, eventNotInTransaction1);
+ this.bucketRegionQueue.addToQueue(3L, event2);
+ this.bucketRegionQueue.addToQueue(4L, event3);
+ this.bucketRegionQueue.addToQueue(5L, event4);
+ this.bucketRegionQueue.addToQueue(6L, event5);
+ this.bucketRegionQueue.addToQueue(7L, event6);
+ this.bucketRegionQueue.addToQueue(8L, event7);
Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
- x -> x.getTransactionId().equals(tx1);
+ ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1);
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
- x -> x.isLastEventInTransaction();
+ ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate();
List<Object> objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);
@@ -182,7 +183,7 @@ public class BucketRegionQueueJUnitTest {
assertEquals(objects, Arrays.asList(new Object[] {event7}));
hasTransactionIdPredicate =
- x -> x.getTransactionId().equals(tx2);
+ ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2);
objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);
assertEquals(2, objects.size());
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
index 7423dbc..67ef206 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.internal.cache.wan.parallel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import junitparams.JUnitParamsRunner;
@@ -33,6 +35,11 @@ import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.execute.data.CustId;
+import org.apache.geode.internal.cache.execute.data.Order;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.execute.data.Shipment;
+import org.apache.geode.internal.cache.execute.data.ShipmentId;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.cache.wan.WANTestBase;
@@ -55,7 +62,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
@Test
public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCache(lnPort);
createSender("ln", 2, true, 100, 300, false, false, null, true);
@@ -77,10 +84,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
}
GemFireCacheImpl gemCache = (GemFireCacheImpl) cache;
- Set regionSet = gemCache.rootRegions();
+ Set<?> regionSet = gemCache.rootRegions();
for (Object r : regionSet) {
- if (((Region) r).getName()
+ if (((Region<?, ?>) r).getName()
.equals(((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]
.getRegion().getName())) {
fail("The shadowPR is exposed to the user");
@@ -683,18 +690,18 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
- AsyncInvocation inv1 =
+ AsyncInvocation<Void> inv1 =
vm7.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 5000));
Wait.pause(500);
- AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
- AsyncInvocation inv3 =
+ AsyncInvocation<Void> inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+ AsyncInvocation<Void> inv3 =
vm6.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 10000));
Wait.pause(1500);
- AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
- inv1.join();
- inv2.join();
- inv3.join();
- inv4.join();
+ AsyncInvocation<Void> inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+ inv1.await();
+ inv2.await();
+ inv3.await();
+ inv4.await();
vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000));
vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000));
@@ -1047,7 +1054,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "PARENT_PR", null, 0,
100, isOffHeap(), shortcut));
String parentRegionFullPath =
- (String) vm3.invoke(() -> WANTestBase.getRegionFullPath(getTestMethodName() + "PARENT_PR"));
+ vm3.invoke(() -> WANTestBase.getRegionFullPath(getTestMethodName() + "PARENT_PR"));
// create colocated (child) PR on site1
vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegion(getTestMethodName() + "CHILD_PR",
@@ -1235,6 +1242,112 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
}
+ @Test
+ public void testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+ throws Exception {
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+ vm6.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+ vm7.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+
+ vm4.invoke(
+ () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true));
+ vm5.invoke(
+ () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true));
+ vm6.invoke(
+ () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true));
+ vm7.invoke(
+ () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true));
+
+ vm4.invoke(
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+ isOffHeap()));
+ vm5.invoke(
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+ isOffHeap()));
+ vm6.invoke(
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+ isOffHeap()));
+ vm7.invoke(
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+ isOffHeap()));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap()));
+ vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap()));
+
+ int customers = 4;
+
+ int transactionsPerCustomer = 1000;
+ final Map<Object, Object> keyValuesInTransactions = new HashMap<>();
+ for (int custId = 0; custId < customers; custId++) {
+ for (int i = 0; i < transactionsPerCustomer; i++) {
+ CustId custIdObject = new CustId(custId);
+ OrderId orderId = new OrderId(i, custIdObject);
+ ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+ ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+ ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+ keyValuesInTransactions.put(orderId, new Order());
+ keyValuesInTransactions.put(shipmentId1, new Shipment());
+ keyValuesInTransactions.put(shipmentId2, new Shipment());
+ keyValuesInTransactions.put(shipmentId3, new Shipment());
+ }
+ }
+
+ int ordersPerCustomerNotInTransactions = 1000;
+
+ final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>();
+ for (int custId = 0; custId < customers; custId++) {
+ for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
+ CustId custIdObject = new CustId(custId);
+ OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject);
+ keyValuesNotInTransactions.put(orderId, new Order());
+ }
+ }
+
+ // eventsPerTransaction is 1 (orders) + 3 (shipments)
+ int eventsPerTransaction = 4;
+ AsyncInvocation<Void> inv1 =
+ vm7.invokeAsync(
+ () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+ eventsPerTransaction));
+
+ AsyncInvocation<Void> inv2 =
+ vm6.invokeAsync(
+ () -> WANTestBase.putGivenKeyValue(orderRegionName, keyValuesNotInTransactions));
+
+ inv1.await();
+ inv2.await();
+
+ int entries =
+ ordersPerCustomerNotInTransactions * customers + transactionsPerCustomer * customers;
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm6.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm7.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+ vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+ vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+ vm6.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+ vm7.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ }
private static RegionShortcut[] getRegionShortcuts() {
return new RegionShortcut[] {RegionShortcut.PARTITION, RegionShortcut.PARTITION_PERSISTENT};
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
index 59b5b7a..5604a96 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
@@ -14,12 +14,20 @@
*/
package org.apache.geode.internal.cache.wan.serial;
+import java.util.HashMap;
+import java.util.Map;
+
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelException;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.execute.data.CustId;
+import org.apache.geode.internal.cache.execute.data.Order;
+import org.apache.geode.internal.cache.execute.data.OrderId;
+import org.apache.geode.internal.cache.execute.data.Shipment;
+import org.apache.geode.internal.cache.execute.data.ShipmentId;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
@@ -35,10 +43,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
}
@Test
- public void testPartitionedSerialPropagation() throws Exception {
+ public void testPartitionedSerialPropagation() {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -71,10 +79,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
}
@Test
- public void testBothReplicatedAndPartitionedSerialPropagation() throws Exception {
+ public void testBothReplicatedAndPartitionedSerialPropagation() {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -122,10 +130,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
}
@Test
- public void testSerialReplicatedAndPartitionedPropagation() throws Exception {
+ public void testSerialReplicatedAndPartitionedPropagation() {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -176,10 +184,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
}
@Test
- public void testSerialReplicatedAndSerialPartitionedPropagation() throws Exception {
+ public void testSerialReplicatedAndSerialPartitionedPropagation() {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -236,11 +244,11 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
}
@Test
- public void testPartitionedSerialPropagationToTwoWanSites() throws Exception {
+ public void testPartitionedSerialPropagationToTwoWanSites() {
Integer lnPort = createFirstLocatorWithDSId(1);
- Integer nyPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
- Integer tkPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort));
+ Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer tkPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReceiver());
@@ -288,8 +296,8 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("Unexpected IOException");
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -322,12 +330,12 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
// start async puts
- AsyncInvocation inv =
+ AsyncInvocation<Void> inv =
vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000));
// close the cache on vm4 in between the puts
vm4.invoke(() -> WANTestBase.killSender());
- inv.join();
+ inv.await();
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
@@ -335,10 +343,10 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
}
@Test
- public void testPartitionedSerialPropagationWithParallelThreads() throws Exception {
+ public void testPartitionedSerialPropagationWithParallelThreads() {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -371,4 +379,112 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
}
+
+ @Test
+ public void testPartitionedSerialPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+ throws Exception {
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(1));
+ vm6.invoke(() -> setNumDispatcherThreadsForTheRun(1));
+ vm7.invoke(() -> setNumDispatcherThreadsForTheRun(1));
+
+ vm4.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true));
+ vm5.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true));
+ vm6.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true));
+ vm7.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true, true));
+
+
+ vm4.invoke(
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+ isOffHeap()));
+ vm5.invoke(
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+ isOffHeap()));
+ vm6.invoke(
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+ isOffHeap()));
+ vm7.invoke(
+ () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10,
+ isOffHeap()));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap()));
+ vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap()));
+
+ int customers = 4;
+
+ int transactionsPerCustomer = 1000;
+ final Map<Object, Object> keyValuesInTransactions = new HashMap<>();
+ for (int custId = 0; custId < customers; custId++) {
+ for (int i = 0; i < transactionsPerCustomer; i++) {
+ CustId custIdObject = new CustId(custId);
+ OrderId orderId = new OrderId(i, custIdObject);
+ ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+ ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+ ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+ keyValuesInTransactions.put(orderId, new Order());
+ keyValuesInTransactions.put(shipmentId1, new Shipment());
+ keyValuesInTransactions.put(shipmentId2, new Shipment());
+ keyValuesInTransactions.put(shipmentId3, new Shipment());
+ }
+ }
+
+ int ordersPerCustomerNotInTransactions = 1000;
+
+ final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>();
+ for (int custId = 0; custId < customers; custId++) {
+ for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
+ CustId custIdObject = new CustId(custId);
+ OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject);
+ keyValuesNotInTransactions.put(orderId, new Order());
+ }
+ }
+
+ // eventsPerTransaction is 1 (orders) + 3 (shipments)
+ int eventsPerTransaction = 4;
+ AsyncInvocation<Void> inv1 =
+ vm7.invokeAsync(
+ () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+ eventsPerTransaction));
+
+ AsyncInvocation<Void> inv2 =
+ vm6.invokeAsync(
+ () -> WANTestBase.putGivenKeyValue(orderRegionName, keyValuesNotInTransactions));
+
+ inv1.await();
+ inv2.await();
+
+ int entries =
+ ordersPerCustomerNotInTransactions * customers + transactionsPerCustomer * customers;
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm6.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm7.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+ vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+ vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+ vm6.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+ vm7.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
+
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ }
}