You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by al...@apache.org on 2021/04/13 07:01:03 UTC

[geode] branch develop updated: GEODE-9122: Avoid possible ConcurrentModificationException with group-transaction-events=true (#6278)

This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 69834d5  GEODE-9122: Avoid possible ConcurrentModificationException with group-transaction-events=true (#6278)
69834d5 is described below

commit 69834d5e81875850b1dde759d04a485f8b3aa461
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Tue Apr 13 08:59:58 2021 +0200

    GEODE-9122: Avoid possible ConcurrentModificationException with group-transaction-events=true (#6278)
    
    When group-transaction-events is set to true
    if the SerialGatewaySenderQueue.peekEventsFromIncompleteTransactions
    or ParallelGatewaySenderQueue.peekEventsFromIncompleteTransactions contain
    more than one TransactionId, and one of them is removed,
    the ConcurrentModificationException will occur.
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 10 +++++----
 .../cache/wan/serial/SerialGatewaySenderQueue.java | 10 ++++++---
 .../ParallelGatewaySenderQueueJUnitTest.java       | 26 ++++++++++++++++++++++
 .../serial/SerialGatewaySenderQueueJUnitTest.java  | 17 ++++++++++++++
 .../geode/internal/cache/wan/WANTestBase.java      |  3 +--
 5 files changed, 57 insertions(+), 9 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 260db0f..5666b0d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1374,7 +1374,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return sender.mustGroupTransactionEvents();
   }
 
-  private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
+  @VisibleForTesting
+  void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
       PartitionedRegion prQ) {
     if (!mustGroupTransactionEvents()) {
       return;
@@ -1388,8 +1389,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
     int retries = 0;
     while (true) {
-      for (Map.Entry<TransactionId, Integer> pendingTransaction : incompleteTransactionIdsInBatch
-          .entrySet()) {
+      for (Iterator<Map.Entry<TransactionId, Integer>> iter =
+          incompleteTransactionIdsInBatch.entrySet().iterator(); iter.hasNext();) {
+        Map.Entry<TransactionId, Integer> pendingTransaction = iter.next();
         TransactionId transactionId = pendingTransaction.getKey();
         int bucketId = pendingTransaction.getValue();
         List<Object> events = peekEventsWithTransactionId(prQ, bucketId, transactionId);
@@ -1403,7 +1405,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
                 event.getKey(), bucketId, event.isLastEventInTransaction(), batch.size());
           }
           if (event.isLastEventInTransaction()) {
-            incompleteTransactionIdsInBatch.remove(transactionId);
+            iter.remove();
           }
         }
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 89537e6..9e7d95d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -461,7 +462,8 @@ public class SerialGatewaySenderQueue implements RegionQueue {
     // so no need to worry about off-heap refCount.
   }
 
-  private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) {
+  @VisibleForTesting
+  void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) {
     if (!mustGroupTransactionEvents()) {
       return;
     }
@@ -473,7 +475,9 @@ public class SerialGatewaySenderQueue implements RegionQueue {
 
     int retries = 0;
     while (true) {
-      for (TransactionId transactionId : incompleteTransactionIdsInBatch) {
+      for (Iterator<TransactionId> iter = incompleteTransactionIdsInBatch.iterator(); iter
+          .hasNext();) {
+        TransactionId transactionId = iter.next();
         List<KeyAndEventPair> keyAndEventPairs =
             peekEventsWithTransactionId(transactionId, lastKey);
         if (keyAndEventPairs.size() > 0
@@ -490,7 +494,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
                   event.getKey(), event.isLastEventInTransaction(), batch.size());
             }
           }
-          incompleteTransactionIdsInBatch.remove(transactionId);
+          iter.remove();
         }
       }
       if (incompleteTransactionIdsInBatch.size() == 0 ||
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index 1f4d81e..41af132 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -457,6 +458,30 @@ public class ParallelGatewaySenderQueueJUnitTest {
   }
 
   @Test
+  public void peekEventsFromIncompleteTransactionsDoesNotThrowConcurrentModificationExceptionWhenCompletingTwoTransactions() {
+    mockGatewaySenderStats();
+    GatewaySenderEventImpl event1 = createGatewaySenderEventImpl(1, false);
+    GatewaySenderEventImpl event2 = createGatewaySenderEventImpl(2, false);
+    GatewaySenderEventImpl event3 = createGatewaySenderEventImpl(1, true);
+    GatewaySenderEventImpl event4 = createGatewaySenderEventImpl(2, true);
+
+    Queue backingList = new LinkedList();
+    backingList.add(event3);
+    backingList.add(event4);
+    BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+    TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
+        Collections.emptySet(), 0, 1, metaRegionFactory);
+    queue.setGroupTransactionEvents(true);
+    queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+    List<GatewaySenderEventImpl> batch = new ArrayList<>(Arrays.asList(event1, event2));
+    PartitionedRegion mockBucketRegion = mockPR("bucketRegion");
+    queue.peekEventsFromIncompleteTransactions(batch, mockBucketRegion);
+  }
+
+
+  @Test
   public void testCalculateTimeToSleepNegativeInputReturnsZero() {
     assertEquals(0L, ParallelGatewaySenderQueue.calculateTimeToSleep(-3));
   }
@@ -476,6 +501,7 @@ public class ParallelGatewaySenderQueueJUnitTest {
     assertEquals(2L, ParallelGatewaySenderQueue.calculateTimeToSleep(40));
   }
 
+
   private GatewaySenderEventImpl createGatewaySenderEventImpl(int transactionId,
       boolean isLastEventInTransaction) {
     GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
index ac50a06..6c1f313 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +38,7 @@ import org.junit.Test;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalRegionFactory;
@@ -227,6 +230,20 @@ public class SerialGatewaySenderQueueJUnitTest {
     return region;
   }
 
+  @Test
+  public void peekEventsFromIncompleteTransactionsDoesNotThrowConcurrentModificationExceptionWhenCompletingTwoTransactions() {
+    GatewaySenderEventImpl event1 = createMockGatewaySenderEventImpl(1, false, region);
+    GatewaySenderEventImpl event2 = createMockGatewaySenderEventImpl(2, false, region);
+
+    TestableSerialGatewaySenderQueue queue = new TestableSerialGatewaySenderQueue(sender,
+        QUEUE_REGION, metaRegionFactory);
+
+    queue.setGroupTransactionEvents(true);
+
+    List<AsyncEvent<?, ?>> batch = new ArrayList(Arrays.asList(event1, event2));
+    queue.peekEventsFromIncompleteTransactions(batch, 0);
+  }
+
   private class TestableSerialGatewaySenderQueue extends SerialGatewaySenderQueue {
 
     private boolean groupTransactionEvents = false;
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index c1a7a6c..34e9d00 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -2744,10 +2744,9 @@ public class WANTestBase extends DistributedTestCase {
     assertNotNull(r);
 
     long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100);
-    long i = 0;
     long j = 0;
     CacheTransactionManager mgr = cache.getCacheTransactionManager();;
-    for (i = 0; i < transactions; i++) {
+    for (int i = 0; i < transactions; i++) {
       boolean done = false;
       do {
         try {