You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/01/22 08:24:18 UTC
[ignite] branch master updated: IGNITE-9322: MVCC: implement
deadlock detector. This closes #5579.
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 457ffc1 IGNITE-9322: MVCC: implement deadlock detector. This closes #5579.
457ffc1 is described below
commit 457ffc1584b7808b9ab2270f8541976ec3c2f022
Author: ipavlukhin <vo...@gmail.com>
AuthorDate: Tue Jan 22 11:24:03 2019 +0300
IGNITE-9322: MVCC: implement deadlock detector. This closes #5579.
---
.../ignite/codegen/MessageCodeGenerator.java | 5 +
.../org/apache/ignite/IgniteSystemProperties.java | 12 +
.../java/org/apache/ignite/internal/GridTopic.java | 5 +-
.../communication/GridIoMessageFactory.java | 12 +
.../processors/cache/GridCacheMapEntry.java | 12 +-
.../processors/cache/GridCacheProcessor.java | 6 +-
.../processors/cache/GridCacheSharedContext.java | 50 +-
.../dht/GridDhtTxAbstractEnlistFuture.java | 4 +
.../distributed/dht/GridDhtTxLocalAdapter.java | 17 +
.../near/GridNearTxAbstractEnlistFuture.java | 7 +
.../distributed/near/GridNearTxEnlistFuture.java | 12 +-
.../near/GridNearTxQueryEnlistFuture.java | 18 +-
.../near/GridNearTxQueryResultsEnlistFuture.java | 13 +-
.../cache/mvcc/DeadlockDetectionManager.java | 348 +++++++++++
.../processors/cache/mvcc/DeadlockProbe.java | 198 +++++++
.../processors/cache/mvcc/MvccProcessor.java | 26 +-
.../processors/cache/mvcc/MvccProcessorImpl.java | 97 ++-
.../internal/processors/cache/mvcc/MvccUtils.java | 9 +
.../internal/processors/cache/mvcc/ProbedTx.java | 235 ++++++++
.../wal/reader/IgniteWalIteratorFactory.java | 4 +-
.../processors/cache/transactions/TxCounters.java | 18 +
.../managers/IgniteDiagnosticMessagesTest.java | 4 +-
.../db/wal/IgniteWalIteratorSwitchSegmentTest.java | 2 +
.../pagemem/BPlusTreePageMemoryImplTest.java | 1 +
.../BPlusTreeReuseListPageMemoryImplTest.java | 1 +
.../pagemem/IndexStoragePageMemoryImplTest.java | 1 +
.../pagemem/PageMemoryImplNoLoadTest.java | 1 +
.../persistence/pagemem/PageMemoryImplTest.java | 1 +
.../TxRollbackOnTopologyChangeTest.java | 2 +-
.../loadtests/hashmap/GridCacheTestContext.java | 3 +-
.../mvcc/CacheMvccPartitionedSqlTxQueriesTest.java | 2 +-
.../mvcc/CacheMvccSqlTxQueriesAbstractTest.java | 15 +
...cheMvccSqlTxQueriesWithReducerAbstractTest.java | 15 +
.../cache/mvcc/MvccDeadlockDetectionTest.java | 652 +++++++++++++++++++++
.../testsuites/IgniteCacheMvccSqlTestSuite.java | 3 +
35 files changed, 1750 insertions(+), 61 deletions(-)
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index ca2dcdc..186c87c 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,6 +44,8 @@ import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.DeadlockProbe;
+import org.apache.ignite.internal.processors.cache.mvcc.ProbedTx;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -168,6 +170,9 @@ public class MessageCodeGenerator {
MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
+ gen.generateAndWrite(ProbedTx.class);
+ gen.generateAndWrite(DeadlockProbe.class);
+
// gen.generateAll(true);
// gen.generateAndWrite(GridCacheMessage.class);
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 6b16de5..5027ebc 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -261,6 +261,18 @@ public final class IgniteSystemProperties {
public static final String IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT = "IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT";
/**
+ * Specifies delay in milliseconds before starting deadlock detection procedure when tx encounters locked key.
+ * <p>
+ * Following values could be used:
+ * <ul>
+ * <li>< 0 disable detection;</li>
+ * <li>0 start detection without a delay;</li>
+ * <li>> 0 start detection after a specified number of milliseconds.</li>
+ * </ul>
+ */
+ public static final String IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY = "IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY";
+
+ /**
* System property to enable pending transaction tracker.
* Affects impact of {@link IgniteSystemProperties#IGNITE_DISABLE_WAL_DURING_REBALANCING} property:
* if this property is set, WAL anyway won't be disabled during rebalancing triggered by baseline topology change.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 437ee4d..5677178 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -139,7 +139,10 @@ public enum GridTopic {
TOPIC_GEN_ENC_KEY,
/** */
- TOPIC_SERVICES;
+ TOPIC_SERVICES,
+
+ /** */
+ TOPIC_DEADLOCK_DETECTION;
/** Enum values. */
private static final GridTopic[] VALS = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index be467f5..b377270 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -124,8 +124,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQu
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.DeadlockProbe;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.ProbedTx;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
@@ -1132,6 +1134,16 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 170:
+ msg = new DeadlockProbe();
+
+ break;
+
+ case 171:
+ msg = new ProbedTx();
+
+ break;
+
// [-3..119] [124..129] [-23..-28] [-36..-55] - this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d659aa5..be1b9e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1151,7 +1151,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>();
- IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
+ IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer);
lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer,
op, needHistory, noCreate, resFut, needOldVal, filter, retVal, entryProc, invokeArgs));
@@ -1300,7 +1300,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>();
- IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
+ IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer);
lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory,
resFut, needOldVal, retVal, filter));
@@ -1392,7 +1392,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>();
- IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
+ IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer);
lockFut.listen(new MvccAcquireLockListener(tx, this, mvccVer, resFut));
@@ -5193,7 +5193,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else if (res.resultType() == ResultType.LOCKED) {
entry.unlockEntry();
- IgniteInternalFuture<?> lockFuture = cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion());
+ IgniteInternalFuture<?> lockFuture = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, res.resultVersion());
lockFuture.listen(this);
@@ -5323,7 +5323,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else if (res.resultType() == ResultType.LOCKED) {
entry.unlockEntry();
- cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()).listen(this);
+ cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, res.resultVersion()).listen(this);
return;
}
@@ -5497,7 +5497,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else if (res.resultType() == ResultType.LOCKED) {
entry.unlockEntry();
- cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()).listen(this);
+ cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, res.resultVersion()).listen(this);
return;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1e336e5..933a9e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache;
+import org.apache.ignite.internal.processors.cache.mvcc.DeadlockDetectionManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
@@ -3218,6 +3219,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
MvccCachingManager mvccCachingMgr = new MvccCachingManager();
+ DeadlockDetectionManager deadlockDetectionMgr = new DeadlockDetectionManager();
+
return new GridCacheSharedContext(
kernalCtx,
tm,
@@ -3236,7 +3239,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
evict,
jta,
storeSesLsnrs,
- mvccCachingMgr
+ mvccCachingMgr,
+ deadlockDetectionMgr
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 6c72169..bc3cf39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
+import org.apache.ignite.internal.processors.cache.mvcc.DeadlockDetectionManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
@@ -115,7 +116,7 @@ public class GridCacheSharedContext<K, V> {
/** Database manager. */
private IgniteCacheDatabaseSharedManager dbMgr;
- /** Snp manager. */
+ /** Snapshot manager. */
private IgniteCacheSnapshotManager snpMgr;
/** Page store manager. {@code Null} if persistence is not enabled. */
@@ -127,12 +128,15 @@ public class GridCacheSharedContext<K, V> {
/** Ttl cleanup manager. */
private GridCacheSharedTtlCleanupManager ttlMgr;
- /** */
+ /** Partitons evict manager. */
private PartitionsEvictManager evictMgr;
/** Mvcc caching manager. */
private MvccCachingManager mvccCachingMgr;
+ /** Deadlock detection manager. */
+ private DeadlockDetectionManager deadlockDetectionMgr;
+
/** Cache contexts map. */
private ConcurrentHashMap<Integer, GridCacheContext<K, V>> ctxMap;
@@ -187,12 +191,17 @@ public class GridCacheSharedContext<K, V> {
* @param walMgr WAL manager. {@code Null} if persistence is not enabled.
* @param walStateMgr WAL state manager.
* @param depMgr Deployment manager.
+ * @param dbMgr Database manager.
+ * @param snpMgr Snapshot manager.
* @param exchMgr Exchange manager.
* @param affMgr Affinity manager.
* @param ioMgr IO manager.
* @param ttlMgr Ttl cleanup manager.
+ * @param evictMgr Partitons evict manager.
* @param jtaMgr JTA manager.
* @param storeSesLsnrs Store session listeners.
+ * @param mvccCachingMgr Mvcc caching manager.
+ * @param deadlockDetectionMgr Deadlock detection manager.
*/
public GridCacheSharedContext(
GridKernalContext kernalCtx,
@@ -212,7 +221,8 @@ public class GridCacheSharedContext<K, V> {
PartitionsEvictManager evictMgr,
CacheJtaManagerAdapter jtaMgr,
Collection<CacheStoreSessionListener> storeSesLsnrs,
- MvccCachingManager mvccCachingMgr
+ MvccCachingManager mvccCachingMgr,
+ DeadlockDetectionManager deadlockDetectionMgr
) {
this.kernalCtx = kernalCtx;
@@ -233,7 +243,8 @@ public class GridCacheSharedContext<K, V> {
ioMgr,
ttlMgr,
evictMgr,
- mvccCachingMgr
+ mvccCachingMgr,
+ deadlockDetectionMgr
);
this.storeSesLsnrs = storeSesLsnrs;
@@ -400,8 +411,8 @@ public class GridCacheSharedContext<K, V> {
ioMgr,
ttlMgr,
evictMgr,
- mvccCachingMgr
- );
+ mvccCachingMgr,
+ deadlockDetectionMgr);
this.mgrs = mgrs;
@@ -428,20 +439,7 @@ public class GridCacheSharedContext<K, V> {
return mgr instanceof GridCacheDeploymentManager || mgr instanceof GridCachePartitionExchangeManager;
}
- /**
- * @param mgrs Managers list.
- * @param txMgr Transaction manager.
- * @param jtaMgr JTA manager.
- * @param verMgr Version manager.
- * @param mvccMgr MVCC manager.
- * @param pageStoreMgr Page store manager. {@code Null} if persistence is not enabled.
- * @param walStateMgr WAL state manager.
- * @param depMgr Deployment manager.
- * @param exchMgr Exchange manager.
- * @param affMgr Affinity manager.
- * @param ioMgr IO manager.
- * @param ttlMgr Ttl cleanup manager.
- */
+ /** */
@SuppressWarnings("unchecked")
private void setManagers(
List<GridCacheSharedManager<K, V>> mgrs,
@@ -460,8 +458,8 @@ public class GridCacheSharedContext<K, V> {
GridCacheIoManager ioMgr,
GridCacheSharedTtlCleanupManager ttlMgr,
PartitionsEvictManager evictMgr,
- MvccCachingManager mvccCachingMgr
- ) {
+ MvccCachingManager mvccCachingMgr,
+ DeadlockDetectionManager deadlockDetectionMgr) {
this.mvccMgr = add(mgrs, mvccMgr);
this.verMgr = add(mgrs, verMgr);
this.txMgr = add(mgrs, txMgr);
@@ -478,6 +476,7 @@ public class GridCacheSharedContext<K, V> {
this.ttlMgr = add(mgrs, ttlMgr);
this.evictMgr = add(mgrs, evictMgr);
this.mvccCachingMgr = add(mgrs, mvccCachingMgr);
+ this.deadlockDetectionMgr = add(mgrs, deadlockDetectionMgr);
}
/**
@@ -831,6 +830,13 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @return Deadlock detection manager.
+ */
+ public DeadlockDetectionManager deadlockDetectionMgr() {
+ return deadlockDetectionMgr;
+ }
+
+ /**
* @return Node ID.
*/
public UUID localNodeId() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 25242c6..3f53b48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -508,6 +508,8 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
updateFut.listen(new CI1<IgniteInternalFuture<GridCacheUpdateTxResult>>() {
@Override public void apply(IgniteInternalFuture<GridCacheUpdateTxResult> fut) {
try {
+ tx.incrementLockCounter();
+
processEntry(entry0, op, fut.get(), val0, backups0);
continueLoop(true);
@@ -523,6 +525,8 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
}
}
+ tx.incrementLockCounter();
+
processEntry(entry, op, res, val0, backups);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 6ef9000..daaa5b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanMap;
@@ -940,4 +941,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
"dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString());
}
+
+ /**
+ * Increments lock counter.
+ */
+ public void incrementLockCounter() {
+ txCounters(true).incrementLockCounter();
+ }
+
+ /**
+ * @return Current value of lock counter.
+ */
+ public int lockCounter() {
+ TxCounters txCntrs = txCounters(false);
+
+ return txCntrs != null ? txCntrs.lockCounter() : 0;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
index b782483..afed6e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCacheRestartingException;
@@ -485,6 +487,11 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
protected abstract void map(boolean topLocked);
/**
+ * @return Nodes from which current future waits responses.
+ */
+ public abstract Set<UUID> pendingResponseNodes();
+
+ /**
* Lock request timeout object.
*/
private class LockTimeoutObject extends GridTimeoutObjectAdapter {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
index d98065d..87d2dc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -22,11 +22,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -164,7 +166,7 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
boolean first = (nodeId != null);
// Need to unlock topology to avoid deadlock with binary descriptors registration.
- if(!topLocked && cctx.topology().holdsLock())
+ if (!topLocked && cctx.topology().holdsLock())
cctx.topology().readUnlock();
for (Batch batch : next) {
@@ -619,6 +621,14 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
}
/** {@inheritDoc} */
+ @Override public Set<UUID> pendingResponseNodes() {
+ return batches.entrySet().stream()
+ .filter(e -> e.getValue().ready())
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxEnlistFuture.class, this, super.toString());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
index b0a83dc..deeb8b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
@@ -18,7 +18,10 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -135,7 +138,7 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
boolean clientFirst = false;
// Need to unlock topology to avoid deadlock with binary descriptors registration.
- if(!topLocked && cctx.topology().holdsLock())
+ if (!topLocked && cctx.topology().holdsLock())
cctx.topology().readUnlock();
for (ClusterNode node : F.view(primary, F.remoteNodes(cctx.localNodeId()))) {
@@ -330,6 +333,19 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
}
/** {@inheritDoc} */
+ @Override public Set<UUID> pendingResponseNodes() {
+ if (initialized() && !isDone()) {
+ return futures().stream()
+ .map(MiniFuture.class::cast)
+ .filter(mini -> !mini.isDone())
+ .map(mini -> mini.node.id())
+ .collect(Collectors.toSet());
+ }
+
+ return Collections.emptySet();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxQueryEnlistFuture.class, this, super.toString());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
index e4d74f2..e3bbed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
@@ -22,11 +22,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -151,7 +153,7 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
boolean first = (nodeId != null);
// Need to unlock topology to avoid deadlock with binary descriptors registration.
- if(!topLocked && cctx.topology().holdsLock())
+ if (!topLocked && cctx.topology().holdsLock())
cctx.topology().readUnlock();
for (Batch batch : next) {
@@ -585,6 +587,14 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
}
/** {@inheritDoc} */
+ @Override public Set<UUID> pendingResponseNodes() {
+ return batches.entrySet().stream()
+ .filter(e -> e.getValue().ready())
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxQueryResultsEnlistFuture.class, this, super.toString());
}
@@ -674,5 +684,4 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
this.ready = ready;
}
}
-
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockDetectionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockDetectionManager.java
new file mode 100644
index 0000000..9b1969b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockDetectionManager.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+
+import static java.util.Collections.singleton;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY;
+import static org.apache.ignite.internal.GridTopic.TOPIC_DEADLOCK_DETECTION;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.belongToSameTx;
+
+/**
+ * Component participating in deadlock detection in a cluster. Detection process is collaborative and it is performed
+ * by relaying special probe messages from waiting transaction to it's blocker.
+ * <p>
+ * Ideas for used detection algorithm are borrowed from Chandy-Misra-Haas deadlock detection algorithm for resource
+ * model.
+ * <p>
+ * Current implementation assumes that transactions obeys 2PL.
+ */
+public class DeadlockDetectionManager extends GridCacheSharedManagerAdapter {
+ /** */
+ private final long detectionStartDelay = Long.getLong(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY, 10_000);
+
+ /** {@inheritDoc} */
+ @Override protected void start0() throws IgniteCheckedException {
+ cctx.gridIO().addMessageListener(TOPIC_DEADLOCK_DETECTION, (nodeId, msg, plc) -> {
+ if (msg instanceof DeadlockProbe) {
+ if (log.isDebugEnabled())
+ log.debug("Received a probe message [msg=" + msg + ']');
+
+ DeadlockProbe msg0 = (DeadlockProbe)msg;
+
+ handleDeadlockProbe(msg0);
+ }
+ else
+ log.warning("Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
+ });
+ }
+
+ /**
+ * Starts a dedlock detection after a delay.
+ *
+ * @param waiterVer Version of the waiting transaction.
+ * @param blockerVer Version of the waited for transaction.
+ * @return Cancellable computation.
+ */
+ public DelayedDeadlockComputation initDelayedComputation(MvccVersion waiterVer, MvccVersion blockerVer) {
+ if (detectionStartDelay < 0)
+ return null;
+
+ if (detectionStartDelay == 0) {
+ startComputation(waiterVer, blockerVer);
+
+ return null;
+ }
+
+ return new DelayedDeadlockComputation(waiterVer, blockerVer, detectionStartDelay);
+ }
+
+ /**
+ * Starts a deadlock detection for a given pair of transaction versions (wait-for edge).
+ *
+ * @param waiterVer Version of the waiting transaction.
+ * @param blockerVer Version of the waited for transaction.
+ */
+ private void startComputation(MvccVersion waiterVer, MvccVersion blockerVer) {
+ if (log.isDebugEnabled())
+ log.debug("Starting deadlock detection [waiterVer=" + waiterVer + ", blockerVer=" + blockerVer + ']');
+
+ Optional<GridDhtTxLocalAdapter> waitingTx = findTx(waiterVer);
+
+ Optional<GridDhtTxLocalAdapter> blockerTx = findTx(blockerVer);
+
+ if (waitingTx.isPresent() && blockerTx.isPresent()) {
+ GridDhtTxLocalAdapter wTx = waitingTx.get();
+
+ GridDhtTxLocalAdapter bTx = blockerTx.get();
+
+ sendProbe(
+ bTx.eventNodeId(),
+ wTx.xidVersion(),
+ // real start time will be filled later when corresponding near node is visited
+ singleton(new ProbedTx(wTx.nodeId(), wTx.xidVersion(), wTx.nearXidVersion(), -1, wTx.lockCounter())),
+ new ProbedTx(bTx.nodeId(), bTx.xidVersion(), bTx.nearXidVersion(), -1, bTx.lockCounter()),
+ true);
+ }
+ }
+
+ /** */
+ private Optional<GridDhtTxLocalAdapter> findTx(MvccVersion mvccVer) {
+ return cctx.tm().activeTransactions().stream()
+ .filter(tx -> tx.local() && tx.mvccSnapshot() != null)
+ .filter(tx -> belongToSameTx(mvccVer, tx.mvccSnapshot()))
+ .map(GridDhtTxLocalAdapter.class::cast)
+ .findAny();
+ }
+
+ /**
+ * Handles received deadlock probe. Possible outcomes:
+ * <ol>
+ * <li>Deadlock is found.</li>
+ * <li>Probe is relayed to other blocking transactions.</li>
+ * <li>Probe is discarded because receiving transaction is not blocked.</li>
+ * </ol>
+ *
+ * @param probe Received probe message.
+ */
+ private void handleDeadlockProbe(DeadlockProbe probe) {
+ if (probe.nearCheck())
+ handleDeadlockProbeForNear(probe);
+ else
+ handleDeadlockProbeForDht(probe);
+ }
+
+ /** */
+ private void handleDeadlockProbeForNear(DeadlockProbe probe) {
+ // a probe is simply discarded if next wait-for edge is not found
+ ProbedTx blocker = probe.blocker();
+
+ GridNearTxLocal nearTx = cctx.tm().tx(blocker.nearXidVersion());
+
+ if (nearTx == null)
+ return;
+
+ // probe each blocker
+ for (UUID pendingNodeId : getPendingResponseNodes(nearTx)) {
+ sendProbe(
+ pendingNodeId,
+ probe.initiatorVersion(),
+ probe.waitChain(),
+ // real start time is filled here
+ blocker.withStartTime(nearTx.startTime()),
+ false);
+ }
+ }
+
+ /** */
+ private void handleDeadlockProbeForDht(DeadlockProbe probe) {
+ // a probe is simply discarded if next wait-for edge is not found
+ cctx.tm().activeTransactions().stream()
+ .filter(IgniteInternalTx::local)
+ .filter(tx -> tx.nearXidVersion().equals(probe.blocker().nearXidVersion()))
+ .findAny()
+ .map(GridDhtTxLocalAdapter.class::cast)
+ .ifPresent(tx -> {
+ // search for locally checked tx (identified as blocker previously) in the wait chain
+ Optional<ProbedTx> repeatedTx = probe.waitChain().stream()
+ .filter(wTx -> wTx.xidVersion().equals(tx.xidVersion()))
+ .findAny();
+
+ if (repeatedTx.isPresent()) {
+ // a deadlock found
+ resolveDeadlock(probe, repeatedTx.get(), tx);
+ }
+ else
+ relayProbeIfLocalTxIsWaiting(probe, tx);
+ });
+ }
+
+ /** */
+ private void resolveDeadlock(DeadlockProbe probe, ProbedTx repeatedTx, GridDhtTxLocalAdapter locTx) {
+ if (log.isDebugEnabled())
+ log.debug("Deadlock detected [probe=" + probe + ']');
+
+ ProbedTx victim = chooseVictim(
+ // real start time is filled here for repeated tx
+ repeatedTx.withStartTime(probe.blocker().startTime()),
+ probe.waitChain());
+
+ if (victim.xidVersion().equals(locTx.xidVersion())) {
+ if (log.isDebugEnabled())
+ log.debug("Chosen victim is on local node, tx will be aborted [victim=" + victim + ']');
+
+ // if a victim tx has made a progress since it was identified as waiting
+ // it means that detected deadlock was broken by other means (e.g. timeout of another tx)
+ if (victim.lockCounter() == locTx.lockCounter())
+ abortTx(locTx);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Chosen victim is on remote node, message will be sent [victim=" + victim + ']');
+
+ // destination node must determine itself as a victim
+ sendProbe(victim.nodeId(), probe.initiatorVersion(), singleton(victim), victim, false);
+ }
+ }
+
+ /** */
+ private void relayProbeIfLocalTxIsWaiting(DeadlockProbe probe, GridDhtTxLocalAdapter locTx) {
+ assert locTx.mvccSnapshot() != null;
+
+ cctx.coordinators().checkWaiting(locTx.mvccSnapshot())
+ .flatMap(this::findTx)
+ .ifPresent(nextBlocker -> {
+ ArrayList<ProbedTx> waitChain = new ArrayList<>(probe.waitChain().size() + 1);
+ waitChain.addAll(probe.waitChain());
+ // real start time is filled here
+ waitChain.add(new ProbedTx(locTx.nodeId(), locTx.xidVersion(), locTx.nearXidVersion(),
+ probe.blocker().startTime(), locTx.lockCounter()));
+
+ // real start time will be filled later when corresponding near node is visited
+ ProbedTx nextProbedTx = new ProbedTx(nextBlocker.nodeId(), nextBlocker.xidVersion(),
+ nextBlocker.nearXidVersion(), -1, nextBlocker.lockCounter());
+
+ sendProbe(
+ nextBlocker.eventNodeId(),
+ probe.initiatorVersion(),
+ waitChain,
+ nextProbedTx,
+ true);
+ });
+ }
+
+ /**
+ * Chooses victim basing on tx start time. Algorithm chooses victim in such way that every site detected a deadlock
+ * will choose the same victim. As a result only one tx participating in a deadlock will be aborted.
+ * <p>
+ * Local tx is needed here because start time for it might not be filled yet for corresponding entry in wait chain.
+ *
+ * @param locTx Deadlocked tx on local node.
+ * @param waitChain Wait chain.
+ * @return Tx chosen as a victim.
+ */
+ @SuppressWarnings("StatementWithEmptyBody")
+ private ProbedTx chooseVictim(ProbedTx locTx, Collection<ProbedTx> waitChain) {
+ Iterator<ProbedTx> it = waitChain.iterator();
+
+ // skip until local tx (inclusive), because txs before are not deadlocked
+ while (it.hasNext() && !it.next().xidVersion().equals(locTx.xidVersion()));
+
+ ProbedTx victim = locTx;
+ long maxStartTime = locTx.startTime();
+
+ while (it.hasNext()) {
+ ProbedTx tx = it.next();
+
+ // seek for youngest tx in order to guarantee forward progress
+ if (tx.startTime() > maxStartTime) {
+ maxStartTime = tx.startTime();
+ victim = tx;
+ }
+ // tie-breaking
+ else if (tx.startTime() == maxStartTime && tx.nearXidVersion().compareTo(victim.nearXidVersion()) > 0)
+ victim = tx;
+ }
+
+ return victim;
+ }
+
+ /** */
+ private void abortTx(GridDhtTxLocalAdapter tx) {
+ cctx.coordinators().failWaiter(tx.mvccSnapshot(), new IgniteTxRollbackCheckedException(
+ "Deadlock detected. Transaction will be rolled back [tx=" + tx + ']'));
+ }
+
+ /** */
+ private Set<UUID> getPendingResponseNodes(GridNearTxLocal tx) {
+ IgniteInternalFuture lockFut = tx.lockFuture();
+
+ if (lockFut instanceof GridNearTxAbstractEnlistFuture)
+ return ((GridNearTxAbstractEnlistFuture<?>)lockFut).pendingResponseNodes();
+
+ return Collections.emptySet();
+ }
+
+ /** */
+ private void sendProbe(UUID destNodeId, GridCacheVersion initiatorVer, Collection<ProbedTx> waitChain,
+ ProbedTx blocker, boolean near) {
+
+ DeadlockProbe probe = new DeadlockProbe(initiatorVer, waitChain, blocker, near);
+
+ if (log.isDebugEnabled())
+ log.debug("Sending probe [probe=" + probe + ", destNode=" + destNodeId + ']');
+
+ try {
+ cctx.gridIO().sendToGridTopic(destNodeId, TOPIC_DEADLOCK_DETECTION, probe, SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Failed to send a deadlock probe [nodeId=" + destNodeId + ']', e);
+ }
+ }
+
+ /**
+ * Delayed deadlock probe computation which can be cancelled.
+ */
+ public class DelayedDeadlockComputation extends GridTimeoutObjectAdapter {
+ /** */
+ private final MvccVersion waiterVer;
+
+ /** */
+ private final MvccVersion blockerVer;
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ startComputation(waiterVer, blockerVer);
+ }
+
+ /** */
+ private DelayedDeadlockComputation(MvccVersion waiterVer, MvccVersion blockerVer, long timeout) {
+ super(timeout);
+ this.waiterVer = waiterVer;
+ this.blockerVer = blockerVer;
+
+ cctx.kernalContext().timeout().addTimeoutObject(this);
+ }
+
+ /** */
+ public void cancel() {
+ cctx.kernalContext().timeout().removeTimeoutObject(this);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockProbe.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockProbe.java
new file mode 100644
index 0000000..47bad8c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockProbe.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Probe message travelling between transactions (from waiting to blocking) during deadlock detection.
+ * @see DeadlockDetectionManager
+ */
+public class DeadlockProbe implements Message {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** */
+ private GridCacheVersion initiatorVer;
+ /** */
+ @GridToStringInclude
+ @GridDirectCollection(ProbedTx.class)
+ private Collection<ProbedTx> waitChain;
+ /** */
+ private ProbedTx blocker;
+ /** */
+ private boolean nearCheck;
+
+ /** */
+ public DeadlockProbe() {
+ }
+
+ /** */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public DeadlockProbe(GridCacheVersion initiatorVer, Collection<ProbedTx> waitChain,
+ ProbedTx blocker, boolean nearCheck) {
+ this.initiatorVer = initiatorVer;
+ this.waitChain = waitChain;
+ this.blocker = blocker;
+ this.nearCheck = nearCheck;
+ }
+
+ /**
+ * @return Identifier of a transaction started a deadlock detection process. Can be used for diagnostics.
+ */
+ public GridCacheVersion initiatorVersion() {
+ return initiatorVer;
+ }
+
+ /**
+ * @return Chain of transactions identified as waiting during deadlock detection.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public Collection<ProbedTx> waitChain() {
+ return waitChain;
+ }
+
+ /**
+ * @return Identifier of a transaction identified as blocking last transaction in the wait chain
+ * during deadlock deteciton.
+ */
+ public ProbedTx blocker() {
+ return blocker;
+ }
+
+ /**
+ * @return {@code True} if checks if near transaction is waiting. {@code False} if checks dht transaction.
+ */
+ public boolean nearCheck() {
+ return nearCheck;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeMessage("blocker", blocker))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeMessage("initiatorVer", initiatorVer))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeBoolean("nearCheck", nearCheck))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeCollection("waitChain", waitChain, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ blocker = reader.readMessage("blocker");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ initiatorVer = reader.readMessage("initiatorVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ nearCheck = reader.readBoolean("nearCheck");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ waitChain = reader.readCollection("waitChain", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(DeadlockProbe.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 170;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DeadlockProbe.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
index 9f8f702..2910eed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.mvcc;
+import java.util.Optional;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -93,11 +94,15 @@ public interface MvccProcessor extends GridProcessor {
boolean hasLocalTransaction(long crd, long cntr);
/**
+ * Stands in the lock wait queue for the current lock holder.
+ *
* @param cctx Cache context.
- * @param locked Version the entry is locked by.
+ * @param waiterVer Version of the waiting tx.
+ * @param blockerVer Version the entry is locked by.
* @return Future, which is completed as soon as the lock is released.
*/
- IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, MvccVersion locked);
+ IgniteInternalFuture<Void> waitForLock(GridCacheContext cctx, MvccVersion waiterVer,
+ MvccVersion blockerVer);
/**
* @param locked Version the entry is locked by.
@@ -206,4 +211,21 @@ public interface MvccProcessor extends GridProcessor {
* @throws IgniteCheckedException If failed to initialize.
*/
void ensureStarted() throws IgniteCheckedException;
+
+ /**
+ * Checks whether one tx is waiting for another tx.
+ * It is assumed that locks on data nodes are requested one by one, so tx can wait only for one another tx here.
+ *
+ * @param mvccVer Version of transaction which is checked for being waiting.
+ * @return Version of tx which blocks checked tx.
+ */
+ Optional<? extends MvccVersion> checkWaiting(MvccVersion mvccVer);
+
+ /**
+ * Unfreezes waiter for specific version failing it with passed exception.
+ *
+ * @param mvccVer Version of a waiter to fail.
+ * @param e Exception reflecting failure reason.
+ */
+ void failWaiter(MvccVersion mvccVer, Exception e);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index f3c563c..b333d33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
@@ -119,6 +120,7 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_IN
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_READ_OP_CNTR;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_START_CNTR;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_START_OP_CNTR;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.belongToSameTx;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compare;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.hasNewVersion;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isVisible;
@@ -234,7 +236,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
@Override public void start() throws IgniteCheckedException {
ctx.event().addDiscoveryEventListener(this::onDiscovery, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED);
- ctx.io().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener());
+ ctx.io().addMessageListener(TOPIC_CACHE_COORDINATOR, new MvccMessageListener());
ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
new CustomEventListener<DynamicCacheChangeBatch>() {
@@ -634,15 +636,23 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, MvccVersion locked) {
- TxKey key = new TxKey(locked.coordinatorVersion(), locked.counter());
+ @Override public IgniteInternalFuture<Void> waitForLock(GridCacheContext cctx, MvccVersion waiterVer,
+ MvccVersion blockerVer) {
+ TxKey key = new TxKey(blockerVer.coordinatorVersion(), blockerVer.counter());
- LockFuture fut = new LockFuture(cctx.ioPolicy());
+ LockFuture fut = new LockFuture(cctx.ioPolicy(), waiterVer);
Waiter waiter = waitMap.merge(key, fut, Waiter::concat);
if (!waiter.hasLocalTransaction() && (waiter = waitMap.remove(key)) != null)
waiter.run(ctx);
+ else {
+ DeadlockDetectionManager.DelayedDeadlockComputation delayedComputation
+ = ctx.cache().context().deadlockDetectionMgr().initDelayedComputation(waiterVer, blockerVer);
+
+ if (delayedComputation != null)
+ fut.listen(fut0 -> delayedComputation.cancel());
+ }
return fut;
}
@@ -1059,13 +1069,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
vacuumWorkers.add(new VacuumScheduler(ctx, log, this));
- for (int i = 0; i < ctx.config().getMvccVacuumThreadCount(); i++) {
+ for (int i = 0; i < ctx.config().getMvccVacuumThreadCount(); i++)
vacuumWorkers.add(new VacuumWorker(ctx, log, cleanupQueue));
- }
- for (GridWorker worker : vacuumWorkers) {
+ for (GridWorker worker : vacuumWorkers)
new IgniteThread(worker).start();
- }
return;
}
@@ -1107,9 +1115,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
if (!queue.isEmpty()) {
IgniteCheckedException ex = vacuumCancelledException();
- for (VacuumTask task : queue) {
+ for (VacuumTask task : queue)
task.onDone(ex);
- }
}
}
}
@@ -1628,17 +1635,35 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
}
+ /** {@inheritDoc} */
+ @Override public Optional<? extends MvccVersion> checkWaiting(MvccVersion mvccVer) {
+ return waitMap.entrySet().stream()
+ .filter(e -> e.getValue().lockFuture(mvccVer) != null)
+ .map(Map.Entry::getKey)
+ .map(key -> new MvccVersionImpl(key.major(), key.minor(), 0))
+ .findAny();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void failWaiter(MvccVersion mvccVer, Exception e) {
+ waitMap.values().stream()
+ .map(w -> w.lockFuture(mvccVer))
+ .filter(Objects::nonNull)
+ .findAny()
+ .ifPresent(w -> w.onDone(e));
+ }
+
/**
*
*/
- private class CoordinatorMessageListener implements GridMessageListener {
+ private class MvccMessageListener implements GridMessageListener {
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
MvccMessage msg0 = (MvccMessage)msg;
if (msg0.waitForCoordinatorInit() && !initFut.isDone()) {
initFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
- @Override public void apply(IgniteInternalFuture<Void> future) {
+ @Override public void apply(IgniteInternalFuture<Void> fut) {
assert curCrd.local();
processMessage(nodeId, msg);
@@ -1680,7 +1705,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public String toString() {
- return "CoordinatorMessageListener[]";
+ return "MvccMessageListener[]";
}
}
@@ -1782,18 +1807,28 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
* @return {@code True} if it is a compound waiter.
*/
boolean compound();
+
+ /**
+ * @param checkedVer Version of transaction checking for wait.
+ * @return Lock future corresponding to checked transaction or {@code null} if it is not waiting.
+ */
+ @Nullable GridFutureAdapter<?> lockFuture(MvccVersion checkedVer);
}
/** */
private static class LockFuture extends GridFutureAdapter<Void> implements Waiter, Runnable {
/** */
private final byte plc;
+ /** */
+ private final MvccVersion waitingTxVer;
/**
* @param plc Pool policy.
+ * @param waitingTxVer Waiting tx version.
*/
- LockFuture(byte plc) {
+ LockFuture(byte plc, MvccVersion waitingTxVer) {
this.plc = plc;
+ this.waitingTxVer = waitingTxVer;
}
/** {@inheritDoc} */
@@ -1804,7 +1839,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public void run(GridKernalContext ctx) {
try {
- ctx.pools().poolForPolicy(plc).execute(this);
+ if (!isDone())
+ ctx.pools().poolForPolicy(plc).execute(this);
}
catch (IgniteCheckedException e) {
U.error(ctx.log(LockFuture.class), e);
@@ -1825,6 +1861,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
@Override public boolean compound() {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public GridFutureAdapter<?> lockFuture(MvccVersion checkedVer) {
+ return belongToSameTx(waitingTxVer, checkedVer) ? this : null;
+ }
}
/** */
@@ -1848,6 +1889,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
@Override public boolean compound() {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public GridFutureAdapter<?> lockFuture(MvccVersion checkedVer) {
+ return null;
+ }
}
/** */
@@ -1888,9 +1934,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public void run(GridKernalContext ctx) {
if (inner.getClass() == ArrayList.class) {
- for (Waiter waiter : (List<Waiter>)inner) {
+ for (Waiter waiter : (List<Waiter>)inner)
waiter.run(ctx);
- }
}
else
((Waiter)inner).run(ctx);
@@ -1910,6 +1955,22 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
@Override public boolean compound() {
return true;
}
+
+ /** {@inheritDoc} */
+ @Override public GridFutureAdapter<?> lockFuture(MvccVersion checkedVer) {
+ if (inner.getClass() == ArrayList.class) {
+ for (Waiter waiter : (List<Waiter>)inner) {
+ GridFutureAdapter<?> waitFut;
+
+ if ((waitFut = waiter.lockFuture(checkedVer)) != null)
+ return waitFut;
+ }
+
+ return null;
+ }
+ else
+ return ((Waiter)inner).lockFuture(checkedVer);
+ }
}
/** */
@@ -2250,7 +2311,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
part.dataStore().updateTxState(cctx, row);
}
else
- part.dataStore().updateTxState(cctx, (MvccDataRow) rest);
+ part.dataStore().updateTxState(cctx, (MvccDataRow)rest);
}
}
finally {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index ac2818d..7047233 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -885,6 +885,15 @@ public class MvccUtils {
}
/**
+ * @param v1 First MVCC version.
+ * @param v2 Second MVCC version.
+ * @return {@code True} if compared versions belongs to the same transaction.
+ */
+ public static boolean belongToSameTx(MvccVersion v1, MvccVersion v2) {
+ return v1.coordinatorVersion() == v2.coordinatorVersion() && v1.counter() == v2.counter();
+ }
+
+ /**
* Mvcc closure interface.
* @param <R> Return type.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/ProbedTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/ProbedTx.java
new file mode 100644
index 0000000..a13a37d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/ProbedTx.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Contains attributes of tx visited during deadlock detection.
+ */
+public class ProbedTx implements Message {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** */
+ private UUID nodeId;
+ /** */
+ private GridCacheVersion xidVer;
+ /** */
+ private GridCacheVersion nearXidVer;
+ /** */
+ private long startTime;
+ /** */
+ private int lockCntr;
+
+ /** */
+ public ProbedTx() {
+ }
+
+ /**
+ * @param nodeId Node on which probed transaction runs.
+ * @param xidVer Identifier of transaction.
+ * @param nearXidVer Identifier of near transaction.
+ * @param startTime Transaction start time.
+ * @param lockCntr Number of locks acquired by probed transaction at a time of probe handling.
+ */
+ public ProbedTx(UUID nodeId, GridCacheVersion xidVer, GridCacheVersion nearXidVer, long startTime,
+ int lockCntr) {
+ this.nodeId = nodeId;
+ this.xidVer = xidVer;
+ this.nearXidVer = nearXidVer;
+ this.startTime = startTime;
+ this.lockCntr = lockCntr;
+ }
+
+ /**
+ * @return Node on which probed transaction runs.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Identifier of transaction.
+ */
+ public GridCacheVersion xidVersion() {
+ return xidVer;
+ }
+
+ /**
+ * @return Identifier of near transaction.
+ */
+ public GridCacheVersion nearXidVersion() {
+ return nearXidVer;
+ }
+
+ /**
+ * @return Transaction start time.
+ */
+ public long startTime() {
+ return startTime;
+ }
+
+ /**
+ * @return Number of locks acquired by probed transaction at a time of probe handling.
+ */
+ public int lockCounter() {
+ return lockCntr;
+ }
+
+ /**
+ * Creates a copy of this instance with modified transaction start time.
+ *
+ * @param updStartTime New start time value.
+ * @return Instance with updated start time.
+ */
+ public ProbedTx withStartTime(long updStartTime) {
+ return new ProbedTx(
+ nodeId,
+ xidVer,
+ nearXidVer,
+ updStartTime,
+ lockCntr
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("lockCntr", lockCntr))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeUuid("nodeId", nodeId))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeLong("startTime", startTime))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMessage("xidVer", xidVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ lockCntr = reader.readInt("lockCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ nearXidVer = reader.readMessage("nearXidVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ nodeId = reader.readUuid("nodeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ startTime = reader.readLong("startTime");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ xidVer = reader.readMessage("xidVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(ProbedTx.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 171;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ProbedTx.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 6e5759d..39ece36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -368,8 +368,8 @@ public class IgniteWalIteratorFactory {
return new GridCacheSharedContext<>(
kernalCtx, null, null, null,
null, null, null, dbMgr, null,
- null, null, null, null,
- null, null,null, null, null
+ null, null, null, null, null,
+ null,null, null, null, null
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
index 550ec09..b12ee56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.jetbrains.annotations.Nullable;
@@ -38,6 +39,9 @@ public class TxCounters {
/** Final update counters for cache partitions in the end of transaction */
private Collection<PartitionUpdateCountersMessage> updCntrs;
+ /** Counter tracking number of entries locked by tx. */
+ private final AtomicInteger lockCntr = new AtomicInteger();
+
/**
* Accumulates size change for cache partition.
*
@@ -127,4 +131,18 @@ public class TxCounters {
return acc;
}
+
+ /**
+ * Increments lock counter.
+ */
+ public void incrementLockCounter() {
+ lockCntr.incrementAndGet();
+ }
+
+ /**
+ * @return Current value of lock counter.
+ */
+ public int lockCounter() {
+ return lockCntr.get();
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
index a0eff32..62bc06a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
@@ -242,7 +242,7 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-9322") // Fix diagnostic message or disable test.
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10637") // Support diagnostics message or disable test.
@Test
public void testSeveralLongRunningMvccTxs() throws Exception {
checkSeveralLongRunningTxs(TRANSACTIONAL_SNAPSHOT);
@@ -364,7 +364,7 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-9322") // Fix diagnostic message or disable test.
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10637") // Support diagnostic messages or disable test.
@Test
public void testLongRunningMvccTx() throws Exception {
checkLongRunningTx(TRANSACTIONAL_SNAPSHOT);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
index 0f6c03f..cc1cf43 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
@@ -151,6 +151,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
null,
null,
null,
+ null,
null)
).createSerializer(serVer);
@@ -476,6 +477,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index f51056f..d48cb0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -80,6 +80,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest {
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index 9a7d63b..333ff63 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -80,6 +80,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
index cbf9dea..89b0cdf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
@@ -95,6 +95,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest {
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index 1190899..e8f6ead 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -89,6 +89,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 7591dd7..fea7d2b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -315,6 +315,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java
index 03e72d5..b030890 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java
@@ -93,7 +93,7 @@ public class TxRollbackOnTopologyChangeTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
if (MvccFeatureChecker.forcedMvcc())
- fail("https://issues.apache.org/jira/browse/IGNITE-9322"); //Won't start nodes if the only test mutes.
+ fail("https://issues.apache.org/jira/browse/IGNITE-10807"); //Won't start nodes if the only test mutes.
super.beforeTest();
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index ad04ded..c6d9e18 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -83,6 +83,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new PartitionsEvictManager(),
new CacheNoopJtaManager(),
null,
+ null,
null
),
defaultCacheConfiguration(),
@@ -110,4 +111,4 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
store().initialize(null, new IdentityHashMap<CacheStore, ThreadLocal>());
}
-}
\ No newline at end of file
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
index 199cfad..02f091d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
@@ -24,7 +24,7 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
/** */
public class CacheMvccPartitionedSqlTxQueriesTest extends CacheMvccSqlTxQueriesAbstractTest {
/** {@inheritDoc} */
- protected CacheMode cacheMode() {
+ @Override protected CacheMode cacheMode() {
return PARTITIONED;
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
index 5841b09..7f0dcf0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
@@ -67,6 +67,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL_SUM;
@@ -81,6 +82,20 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
*/
@RunWith(JUnit4.class)
public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY, "-1");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY);
+
+ super.afterTestsStopped();
+ }
+
/**
* @throws Exception If failed.
*/
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java
index 684631a..f0a586a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java
@@ -44,6 +44,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.runMultiThreaded;
@@ -55,6 +56,20 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
*/
@RunWith(JUnit4.class)
public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends CacheMvccAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY, "-1");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY);
+
+ super.afterTestsStopped();
+ }
+
/** */
private static final int TIMEOUT = 3000;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDeadlockDetectionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDeadlockDetectionTest.java
new file mode 100644
index 0000000..2a2d3ca
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDeadlockDetectionTest.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/** */
+@RunWith(JUnit4.class)
+public class MvccDeadlockDetectionTest extends GridCommonAbstractTest {
+ /** */
+ @BeforeClass
+ public static void setUpClass() {
+ System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY, "0");
+ }
+
+ /** */
+ @AfterClass
+ public static void tearDownClass() {
+ System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY);
+ }
+
+ /** */
+ private IgniteEx client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** */
+ private void setUpGrids(int n, boolean indexed) throws Exception {
+ Ignite ign = startGridsMultiThreaded(n);
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL_SNAPSHOT);
+ if (indexed)
+ ccfg.setIndexedTypes(Integer.class, Integer.class);
+
+ ign.getOrCreateCache(ccfg);
+
+ G.setClientMode(true);
+
+ client = startGrid(n);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @After
+ public void cleanupTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void detectSimpleDeadlock() throws Exception {
+ setUpGrids(2, false);
+
+ Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME));
+ Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+
+ IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ assert client.configuration().isClientMode();
+
+ CyclicBarrier b = new CyclicBarrier(2);
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key0, 0);
+ b.await();
+ cache.put(key1, 1);
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key1, 1);
+ b.await();
+ cache.put(key0, 0);
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void detectSimpleDeadlockFastUpdate() throws Exception {
+ setUpGrids(2, true);
+
+ IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME));
+ Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key0, -1));
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key1, -1));
+
+ assert client.configuration().isClientMode();
+
+ CyclicBarrier b = new CyclicBarrier(2);
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key = ?").setArgs(key0));
+ b.await();
+ cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key = ?").setArgs(key1));
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key = ?").setArgs(key1));
+ b.await();
+ cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key = ?").setArgs(key0));
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void detect3Deadlock() throws Exception {
+ setUpGrids(3, false);
+
+ Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME));
+ Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+ Integer key2 = primaryKey(grid(2).cache(DEFAULT_CACHE_NAME));
+
+ IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ assert client.configuration().isClientMode();
+
+ CyclicBarrier b = new CyclicBarrier(3);
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key0, 0);
+ b.await();
+ cache.put(key1, 1);
+
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key1, 0);
+ b.await();
+ cache.put(key2, 1);
+
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut2 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key2, 1);
+ b.await();
+ cache.put(key0, 0);
+
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut1, fut2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void detectMultipleLockWaitDeadlock() throws Exception {
+ // T0 -> T1
+ // \-> T2 -> T0
+ setUpGrids(3, true);
+
+ IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME));
+ Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+ Integer key2 = primaryKey(grid(2).cache(DEFAULT_CACHE_NAME));
+
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key0, -1));
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key1, -1));
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key2, -1));
+
+ CyclicBarrier b = new CyclicBarrier(3);
+
+ IgniteInternalFuture<Object> fut2 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.query(new SqlFieldsQuery("update Integer set _val = 2 where _key = ?").setArgs(key2));
+ b.await();
+ cache.query(new SqlFieldsQuery("update Integer set _val = 2 where _key = ?").setArgs(key0));
+
+ // rollback to prevent waiting tx abort due write conflict
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key = ?").setArgs(key1));
+ b.await();
+ GridTestUtils.waitForCondition(fut2::isDone, 1000);
+
+ // rollback to prevent waiting tx abort due write conflict
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key = ?").setArgs(key0));
+ b.await();
+ cache.query(
+ new SqlFieldsQuery("update Integer set _val = 0 where _key = ? or _key = ?").setArgs(key2, key1));
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ fut1.get(10, TimeUnit.SECONDS);
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void detectDeadlockLocalEntriesEnlistFuture() throws Exception {
+ setUpGrids(1, false);
+
+ List<Integer> keys = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 2);
+
+ IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ assert client.configuration().isClientMode();
+
+ CyclicBarrier b = new CyclicBarrier(2);
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(keys.get(0), 11);
+ b.await();
+ cache.put(keys.get(1), 11);
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(keys.get(1), 22);
+ b.await();
+ cache.put(keys.get(0), 22);
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void detectDeadlockLocalPrimary() throws Exception {
+ // Checks that case when near tx does local on enlist on the same node and no dht tx is created
+
+ setUpGrids(2, false);
+
+ IgniteCache<Object, Object> cache0 = grid(0).cache(DEFAULT_CACHE_NAME);
+ IgniteCache<Object, Object> cache1 = grid(1).cache(DEFAULT_CACHE_NAME);
+
+ int key0 = primaryKey(cache0);
+ int key1 = primaryKey(cache1);
+
+ CyclicBarrier b = new CyclicBarrier(2);
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache0.put(key1, 11);
+ b.await();
+ cache0.put(key0, 11);
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = grid(1).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache1.put(key0, 22);
+ b.await();
+ cache1.put(key1, 22);
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void detectDeadlockLocalQueryEnlistFuture() throws Exception {
+ setUpGrids(1, true);
+
+ List<Integer> keys = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 2);
+
+ Collections.sort(keys);
+
+ Integer key0 = keys.get(0), key1 = keys.get(1);
+
+ IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ assert client.configuration().isClientMode();
+
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key0, -1));
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key1, -1));
+
+ CyclicBarrier b = new CyclicBarrier(2);
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key <= ?").setArgs(key0));
+ b.await();
+ cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key >= ?").setArgs(key1));
+ TimeUnit.SECONDS.sleep(2);
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key >= ?").setArgs(key1));
+ b.await();
+ cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key <= ?").setArgs(key0));
+
+ tx.commit();
+ }
+
+ return null;
+ });
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void nonDeadlockedTxDetectsDeadlock1() throws Exception {
+ setUpGrids(2, false);
+
+ Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME));
+ Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+
+ IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ assert client.configuration().isClientMode();
+
+ CyclicBarrier b = new CyclicBarrier(3);
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ blockProbe(grid(1), tx);
+
+ cache.put(key0, 0);
+ b.await();
+ cache.put(key1, 1);
+
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ blockProbe(grid(0), tx);
+
+ cache.put(key1, 1);
+ b.await();
+ cache.put(key0, 0);
+
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ b.await();
+ tryPutRepeatedly(cache, key0);
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void nonDeadlockedTxDetectsDeadlock2() throws Exception {
+ setUpGrids(2, false);
+
+ List<Integer> keys0 = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 2);
+ Integer key00 = keys0.get(0);
+ Integer key01 = keys0.get(1);
+ Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+
+ IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ assert client.configuration().isClientMode();
+
+ CyclicBarrier b = new CyclicBarrier(3);
+
+ IgniteInternalFuture<Object> fut0 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ blockProbe(grid(1), tx);
+
+ cache.put(key00, 0);
+ b.await();
+ cache.put(key1, 1);
+
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ blockProbe(grid(0), tx);
+
+ cache.put(key1, 1);
+ cache.put(key01, 0);
+ b.await();
+ cache.put(key00, 0);
+
+ tx.rollback();
+ }
+
+ return null;
+ });
+
+ b.await();
+ tryPutRepeatedly(cache, key01);
+
+ assertExactlyOneAbortedDueDeadlock(fut0, fut1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void randomizedPuts() throws Exception {
+ int gridCnt = SF.applyLB(10, 2);
+ int opsByWorker = SF.applyLB(1000, 10);
+
+ setUpGrids(gridCnt, false);
+
+ List<Integer> keys = new ArrayList<>();
+ for (int i = 0; i < gridCnt; i++)
+ keys.addAll(primaryKeys(grid(i).cache(DEFAULT_CACHE_NAME), 3));
+
+ AtomicInteger aborted = new AtomicInteger();
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+ for (int i = 0; i < gridCnt * 2; i++) {
+ IgniteEx ign = grid(i % gridCnt);
+ IgniteCache<Object, Object> cache = ign.cache(DEFAULT_CACHE_NAME);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+ for (int k = 0; k < opsByWorker; k++) {
+ try (Transaction tx = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ ArrayList<Integer> keys0 = new ArrayList<>(keys);
+ Collections.shuffle(keys0);
+ int nkeys = ThreadLocalRandom.current().nextInt(8) + 5;
+ for (int j = 0; j < nkeys; j++)
+ cache.put(keys0.get(j), j);
+
+ tx.rollback();
+ }
+ catch (Exception e) {
+ if (X.hasCause(e, IgniteTxRollbackCheckedException.class))
+ aborted.incrementAndGet();
+ }
+ }
+ });
+ futs.add(fut);
+ }
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get(10, TimeUnit.MINUTES);
+
+ log.info("Number of txs aborted: " + aborted);
+ }
+
+ /** */
+ private static void blockProbe(IgniteEx ign, Transaction tx) {
+ ((TestRecordingCommunicationSpi)ign.configuration().getCommunicationSpi())
+ .blockMessages((node, msg) -> {
+ if (msg instanceof DeadlockProbe) {
+ DeadlockProbe msg0 = (DeadlockProbe)msg;
+ GridNearTxLocal tx0 = ((TransactionProxyImpl)tx).tx();
+ return msg0.initiatorVersion().equals(tx0.xidVersion());
+ }
+
+ return false;
+ });
+ }
+
+ /** */
+ private void assertExactlyOneAbortedDueDeadlock(IgniteInternalFuture<?>... futs) throws IgniteCheckedException {
+ assert futs.length > 0;
+
+ int aborted = 0;
+
+ for (IgniteInternalFuture<?> fut : futs) {
+ try {
+ fut.get(10, TimeUnit.SECONDS);
+ }
+ catch (IgniteCheckedException e) {
+ // TODO check expected exceptions once https://issues.apache.org/jira/browse/IGNITE-9470 is resolved
+ if (X.hasCause(e, IgniteTxRollbackCheckedException.class))
+ aborted++;
+ else
+ throw e;
+ }
+ }
+
+ if (aborted != 1)
+ fail("Exactly one tx is expected to be aborted, but was " + aborted);
+ }
+
+ /** */
+ private void tryPutRepeatedly(IgniteCache<Object, Object> cache, Integer key0) {
+ for (int i = 0; i < 100; i++) {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 200, 1)) {
+ cache.put(key0, 33);
+
+ break;
+ }
+ catch (Exception ignored) {
+ }
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
index 0418914..d1ed2f3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCounte
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxNodeMappingTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccDeadlockDetectionTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadOperationsTest;
import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest;
@@ -93,6 +94,8 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
CacheMvccTxNodeMappingTest.class,
+ MvccDeadlockDetectionTest.class,
+
// SQL vs CacheAPI consistency.
MvccRepeatableReadOperationsTest.class,
MvccRepeatableReadBulkOpsTest.class,