You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/03/13 14:16:57 UTC

[ignite] branch ignite-2.8.1 updated: IGNITE-12746 Regression in GridCacheColocatedDebugTest: putAll of sorted keys causes deadlock - Fixes #7507.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch ignite-2.8.1
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.8.1 by this push:
     new 8246bd8  IGNITE-12746 Regression in GridCacheColocatedDebugTest: putAll of sorted keys causes deadlock - Fixes #7507.
8246bd8 is described below

commit 8246bd8427a93c3a3706c4e24b46d1c8758579b1
Author: Ivan Rakov <ir...@apache.org>
AuthorDate: Fri Mar 13 17:16:38 2020 +0300

    IGNITE-12746 Regression in GridCacheColocatedDebugTest: putAll of sorted keys causes deadlock - Fixes #7507.
    
    (cherry picked from commit dc547e21e1bd6a0b4e234b05751532feb8216a87)
---
 .../distributed/GridDistributedCacheEntry.java     |   7 +-
 .../cache/local/GridLocalCacheEntry.java           |   8 +-
 .../cache/transactions/IgniteTxStateImpl.java      |   2 +-
 .../dht/GridCacheColocatedDebugTest.java           | 109 +++++++++++++++++++++
 4 files changed, 121 insertions(+), 5 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index bb94058..3f0300f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -678,10 +678,13 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
             unlockEntry();
         }
 
+        boolean lockedByThreadChainVer = owner != null && owner.hasCandidate(ver);
+
+        // If locked by the thread chain version no need to do recursive thread chain scans for the same chain.
         // This call must be made outside of synchronization.
-        checkOwnerChanged(prev, owner, val, true);
+        checkOwnerChanged(prev, owner, val, lockedByThreadChainVer);
 
-        return owner == null || !owner.hasCandidate(ver); // Will return false if locked by thread chain version.
+        return !lockedByThreadChainVer;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index cbcb4b8..dfe9d85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -229,9 +229,13 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
             unlockEntry();
         }
 
-        checkOwnerChanged(prev, owner, val, true);
+        boolean lockedByThreadChainVer = owner != null && owner.hasCandidate(ver);
 
-        return owner == null || !owner.hasCandidate(ver); // Will return false if locked by thread chain version.
+        // If locked by the thread chain version no need to do recursive thread chain scans for the same chain.
+        // This call must be made outside of synchronization.
+        checkOwnerChanged(prev, owner, val, lockedByThreadChainVer);
+
+        return !lockedByThreadChainVer;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 40299ed..d9d2099 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -439,7 +439,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
      * @return All entries. Returned collection is copy of internal collection.
      */
     public synchronized Collection<IgniteTxEntry> allEntriesCopy() {
-        return txMap == null ? Collections.<IgniteTxEntry>emptySet() : new HashSet<>(txMap.values());
+        return txMap == null ? Collections.<IgniteTxEntry>emptySet() : new ArrayList<>(txMap.values());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java
index f211587..9fa226d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import org.apache.ignite.Ignite;
@@ -34,6 +35,7 @@ import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
@@ -41,14 +43,17 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheTestStore;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.junit.Test;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TO_STRING_MAX_LENGTH;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
@@ -984,6 +989,110 @@ public class GridCacheColocatedDebugTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Version of check thread chain case for optimistic transactions.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_TO_STRING_MAX_LENGTH, value = "100000")
+    public void testConcurrentCheckThreadChainOptimistic() throws Exception {
+        testConcurrentCheckThreadChain(OPTIMISTIC);
+    }
+
+    /**
+     * Version of check thread chain case for pessimistic transactions.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_TO_STRING_MAX_LENGTH, value = "100000")
+    public void testConcurrentCheckThreadChainPessimistic() throws Exception {
+        testConcurrentCheckThreadChain(PESSIMISTIC);
+    }
+
+    /**
+     * Covers scenario when thread chain locks acquisition for XID 1 should be continued during unsuccessful attempt
+     * to acquire lock on certain key for XID 2 (XID 1 with uncompleted chain becomes owner of this key instead).
+     *
+     * @throws Exception If failed.
+     */
+    protected void testConcurrentCheckThreadChain(TransactionConcurrency txConcurrency) throws Exception {
+        storeEnabled = false;
+
+        startGrid(0);
+
+        try {
+            final AtomicLong iterCnt = new AtomicLong();
+
+            int commonKey = 1000;
+
+            int otherKeyPickVariance = 10;
+
+            int otherKeysCnt = 5;
+
+            int maxIterCnt = MAX_ITER_CNT * 10;
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @Override public void run() {
+                    long threadId = Thread.currentThread().getId();
+
+                    long itNum;
+
+                    while ((itNum = iterCnt.getAndIncrement()) < maxIterCnt) {
+                        Map<Integer, String> vals = U.newLinkedHashMap(otherKeysCnt * 2 + 1);
+
+                        for (int i = 0; i < otherKeysCnt; i++) {
+                            int key = ThreadLocalRandom.current().nextInt(
+                                otherKeyPickVariance * i, otherKeyPickVariance * (i + 1));
+
+                            vals.put(key, String.valueOf(key) + threadId);
+                        }
+
+                        vals.put(commonKey, String.valueOf(commonKey) + threadId);
+
+                        for (int i = 0; i < otherKeysCnt; i++) {
+                            int key = ThreadLocalRandom.current().nextInt(
+                                commonKey + otherKeyPickVariance * (i + 1), otherKeyPickVariance * (i + 2) + commonKey);
+
+                            vals.put(key, String.valueOf(key) + threadId);
+                        }
+
+                        try (Transaction tx = grid(0).transactions().txStart(txConcurrency, READ_COMMITTED)) {
+                            jcache(0).putAll(vals);
+
+                            tx.commit();
+                        }
+
+                        if (itNum > 0 && itNum % 5000 == 0)
+                            info(">>> " + itNum + " iterations completed.");
+                    }
+                }
+            }, THREAD_CNT);
+
+            while (true) {
+                long prevIterCnt = iterCnt.get();
+
+                try {
+                    fut.get(5_000);
+
+                    break;
+                }
+                catch (IgniteFutureTimeoutCheckedException ignored) {
+                    if (iterCnt.get() == prevIterCnt) {
+                        Collection<IgniteInternalTx> hangingTxes =
+                            ignite(0).context().cache().context().tm().activeTransactions();
+
+                        fail(hangingTxes.toString());
+                    }
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * Gets key for which given node is primary.
      *
      * @param g Grid.