You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by al...@apache.org on 2021/04/13 07:01:03 UTC
[geode] branch develop updated: GEODE-9122: Avoid possible
ConcurrentModificationException with group-transaction-events=true (#6278)
This is an automated email from the ASF dual-hosted git repository.
alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 69834d5 GEODE-9122: Avoid possible ConcurrentModificationException with group-transaction-events=true (#6278)
69834d5 is described below
commit 69834d5e81875850b1dde759d04a485f8b3aa461
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Tue Apr 13 08:59:58 2021 +0200
GEODE-9122: Avoid possible ConcurrentModificationException with group-transaction-events=true (#6278)
When group-transaction-events is set to true
if the SerialGatewaySenderQueue.peekEventsFromIncompleteTransactions
or ParallelGatewaySenderQueue.peekEventsFromIncompleteTransactions contain
more than one TransactionId, and one of them is removed,
the ConcurrentModificationException will occur.
---
.../wan/parallel/ParallelGatewaySenderQueue.java | 10 +++++----
.../cache/wan/serial/SerialGatewaySenderQueue.java | 10 ++++++---
.../ParallelGatewaySenderQueueJUnitTest.java | 26 ++++++++++++++++++++++
.../serial/SerialGatewaySenderQueueJUnitTest.java | 17 ++++++++++++++
.../geode/internal/cache/wan/WANTestBase.java | 3 +--
5 files changed, 57 insertions(+), 9 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 260db0f..5666b0d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1374,7 +1374,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return sender.mustGroupTransactionEvents();
}
- private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
+ @VisibleForTesting
+ void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
PartitionedRegion prQ) {
if (!mustGroupTransactionEvents()) {
return;
@@ -1388,8 +1389,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
int retries = 0;
while (true) {
- for (Map.Entry<TransactionId, Integer> pendingTransaction : incompleteTransactionIdsInBatch
- .entrySet()) {
+ for (Iterator<Map.Entry<TransactionId, Integer>> iter =
+ incompleteTransactionIdsInBatch.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<TransactionId, Integer> pendingTransaction = iter.next();
TransactionId transactionId = pendingTransaction.getKey();
int bucketId = pendingTransaction.getValue();
List<Object> events = peekEventsWithTransactionId(prQ, bucketId, transactionId);
@@ -1403,7 +1405,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
event.getKey(), bucketId, event.isLastEventInTransaction(), batch.size());
}
if (event.isLastEventInTransaction()) {
- incompleteTransactionIdsInBatch.remove(transactionId);
+ iter.remove();
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 89537e6..9e7d95d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -461,7 +462,8 @@ public class SerialGatewaySenderQueue implements RegionQueue {
// so no need to worry about off-heap refCount.
}
- private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) {
+ @VisibleForTesting
+ void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) {
if (!mustGroupTransactionEvents()) {
return;
}
@@ -473,7 +475,9 @@ public class SerialGatewaySenderQueue implements RegionQueue {
int retries = 0;
while (true) {
- for (TransactionId transactionId : incompleteTransactionIdsInBatch) {
+ for (Iterator<TransactionId> iter = incompleteTransactionIdsInBatch.iterator(); iter
+ .hasNext();) {
+ TransactionId transactionId = iter.next();
List<KeyAndEventPair> keyAndEventPairs =
peekEventsWithTransactionId(transactionId, lastKey);
if (keyAndEventPairs.size() > 0
@@ -490,7 +494,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
event.getKey(), event.isLastEventInTransaction(), batch.size());
}
}
- incompleteTransactionIdsInBatch.remove(transactionId);
+ iter.remove();
}
}
if (incompleteTransactionIdsInBatch.size() == 0 ||
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index 1f4d81e..41af132 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
@@ -457,6 +458,30 @@ public class ParallelGatewaySenderQueueJUnitTest {
}
@Test
+ public void peekEventsFromIncompleteTransactionsDoesNotThrowConcurrentModificationExceptionWhenCompletingTwoTransactions() {
+ mockGatewaySenderStats();
+ GatewaySenderEventImpl event1 = createGatewaySenderEventImpl(1, false);
+ GatewaySenderEventImpl event2 = createGatewaySenderEventImpl(2, false);
+ GatewaySenderEventImpl event3 = createGatewaySenderEventImpl(1, true);
+ GatewaySenderEventImpl event4 = createGatewaySenderEventImpl(2, true);
+
+ Queue backingList = new LinkedList();
+ backingList.add(event3);
+ backingList.add(event4);
+ BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+ TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
+ Collections.emptySet(), 0, 1, metaRegionFactory);
+ queue.setGroupTransactionEvents(true);
+ queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+ List<GatewaySenderEventImpl> batch = new ArrayList<>(Arrays.asList(event1, event2));
+ PartitionedRegion mockBucketRegion = mockPR("bucketRegion");
+ queue.peekEventsFromIncompleteTransactions(batch, mockBucketRegion);
+ }
+
+
+ @Test
public void testCalculateTimeToSleepNegativeInputReturnsZero() {
assertEquals(0L, ParallelGatewaySenderQueue.calculateTimeToSleep(-3));
}
@@ -476,6 +501,7 @@ public class ParallelGatewaySenderQueueJUnitTest {
assertEquals(2L, ParallelGatewaySenderQueue.calculateTimeToSleep(40));
}
+
private GatewaySenderEventImpl createGatewaySenderEventImpl(int transactionId,
boolean isLastEventInTransaction) {
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
index ac50a06..6c1f313 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -36,6 +38,7 @@ import org.junit.Test;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegionFactory;
@@ -227,6 +230,20 @@ public class SerialGatewaySenderQueueJUnitTest {
return region;
}
+ @Test
+ public void peekEventsFromIncompleteTransactionsDoesNotThrowConcurrentModificationExceptionWhenCompletingTwoTransactions() {
+ GatewaySenderEventImpl event1 = createMockGatewaySenderEventImpl(1, false, region);
+ GatewaySenderEventImpl event2 = createMockGatewaySenderEventImpl(2, false, region);
+
+ TestableSerialGatewaySenderQueue queue = new TestableSerialGatewaySenderQueue(sender,
+ QUEUE_REGION, metaRegionFactory);
+
+ queue.setGroupTransactionEvents(true);
+
+ List<AsyncEvent<?, ?>> batch = new ArrayList(Arrays.asList(event1, event2));
+ queue.peekEventsFromIncompleteTransactions(batch, 0);
+ }
+
private class TestableSerialGatewaySenderQueue extends SerialGatewaySenderQueue {
private boolean groupTransactionEvents = false;
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index c1a7a6c..34e9d00 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -2744,10 +2744,9 @@ public class WANTestBase extends DistributedTestCase {
assertNotNull(r);
long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100);
- long i = 0;
long j = 0;
CacheTransactionManager mgr = cache.getCacheTransactionManager();;
- for (i = 0; i < transactions; i++) {
+ for (int i = 0; i < transactions; i++) {
boolean done = false;
do {
try {