You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/31 06:33:10 UTC
[20/38] ignite git commit: ignite-2968 Deadlock detection for
optimistic tx and near caches
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 63c9919..f9357f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -62,6 +61,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemo
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -126,7 +127,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
/** Deadlock detection maximum iterations. */
- static final int DEADLOCK_MAX_ITERS =
+ static int DEADLOCK_MAX_ITERS =
IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
/** Committing transactions. */
@@ -389,7 +390,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* {@code false} otherwise.
*/
public boolean isCompleted(IgniteInternalTx tx) {
- return completedVersHashMap.containsKey(tx.xidVersion());
+ boolean completed = completedVersHashMap.containsKey(tx.xidVersion());
+
+ // Need check that for tx with timeout rollback message was not received before lock.
+ if (!completed && tx.local() && tx.dht() && tx.timeout() > 0)
+ return completedVersHashMap.containsKey(tx.nearXidVersion());
+
+ return completed;
}
/**
@@ -495,13 +502,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return null;
}
- if (tx.timeout() > 0) {
- cctx.time().addTimeoutObject(tx);
-
- if (log.isDebugEnabled())
- log.debug("Registered transaction with timeout processor: " + tx);
- }
-
if (log.isDebugEnabled())
log.debug("Transaction created: " + tx);
@@ -786,7 +786,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
*/
public void prepareTx(IgniteInternalTx tx) throws IgniteCheckedException {
if (tx.state() == MARKED_ROLLBACK) {
- if (tx.timedOut())
+ if (tx.remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
throw new IgniteCheckedException("Transaction is marked for rollback: " + tx);
@@ -1081,13 +1081,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Committing from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']');
- if (tx.timeout() > 0) {
- cctx.time().removeTimeoutObject(tx);
-
- if (log.isDebugEnabled())
- log.debug("Unregistered transaction with timeout processor: " + tx);
- }
-
/*
* Note that write phase is handled by transaction adapter itself,
* so we don't do it here.
@@ -2006,17 +1999,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!(nearTxLoc || tx instanceof GridDhtTxLocal) || !hasKeys(tx, txKeys))
continue;
- Collection<IgniteTxEntry> txEntries = tx.allEntries();
+ IgniteTxState state = tx.txState();
+
+ assert state instanceof IgniteTxStateImpl || state instanceof IgniteTxImplicitSingleStateImpl;
+
+ Collection<IgniteTxEntry> txEntries =
+ state instanceof IgniteTxStateImpl ? ((IgniteTxStateImpl)state).allEntriesCopy() : state.allEntries();
- Set<KeyCacheObject> requestedKeys = null;
+ Set<IgniteTxKey> requestedKeys = null;
// Try to get info about requested keys for detached entries in case of GridNearTxLocal transaction
// in order to reduce amount of requests to remote nodes.
if (nearTxLoc) {
- GridDhtColocatedLockFuture fut = colocatedLockFuture(tx);
+ if (tx.pessimistic()) {
+ GridDhtColocatedLockFuture fut =
+ (GridDhtColocatedLockFuture)mvccFuture(tx, GridDhtColocatedLockFuture.class);
+
+ if (fut != null)
+ requestedKeys = fut.requestedKeys();
+
+ GridNearLockFuture nearFut = (GridNearLockFuture)mvccFuture(tx, GridNearLockFuture.class);
- if (fut != null)
- requestedKeys = fut.requestedKeys();
+ if (nearFut != null) {
+ Set<IgniteTxKey> nearRequestedKeys = nearFut.requestedKeys();
+
+ if (nearRequestedKeys != null) {
+ if (requestedKeys == null)
+ requestedKeys = nearRequestedKeys;
+ else
+ requestedKeys = nearRequestedKeys;
+ }
+ }
+ }
+ else {
+ GridNearOptimisticTxPrepareFuture fut =
+ (GridNearOptimisticTxPrepareFuture)mvccFuture(tx, GridNearOptimisticTxPrepareFuture.class);
+
+ if (fut != null)
+ requestedKeys = fut.requestedKeys();
+ }
}
for (IgniteTxEntry txEntry : txEntries) {
@@ -2073,17 +2094,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/**
* @param tx Tx. Must be instance of {@link GridNearTxLocal}.
- * @return Colocated future.
+ * @param cls Future class.
+ * @return Cache future.
*/
- private GridDhtColocatedLockFuture colocatedLockFuture(IgniteInternalTx tx) {
+ private IgniteInternalFuture mvccFuture(IgniteInternalTx tx, Class<? extends IgniteInternalFuture> cls) {
assert tx instanceof GridNearTxLocal : tx;
Collection<GridCacheMvccFuture<?>> futs = cctx.mvcc().mvccFutures(tx.nearXidVersion());
if (futs != null) {
for (GridCacheMvccFuture<?> fut : futs) {
- if (fut instanceof GridDhtColocatedLockFuture)
- return (GridDhtColocatedLockFuture)fut;
+ if (fut.getClass().equals(cls))
+ return fut;
}
}
@@ -2115,6 +2137,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Collection of active transaction deadlock detection futures.
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<IgniteInternalFuture<?>> deadlockDetectionFutures() {
+ Collection<? extends IgniteInternalFuture<?>> values = deadlockDetectFuts.values();
+
+ return (Collection<IgniteInternalFuture<?>>)values;
+ }
+
+ /**
* Timeout object for node failure handler.
*/
private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index c116d0d..f23cca9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedHashMap;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -362,6 +364,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values();
}
+ /**
+ * @return All entries. Returned collection is copy of internal collection.
+ */
+ public synchronized Collection<IgniteTxEntry> allEntriesCopy() {
+ return txMap == null ? Collections.<IgniteTxEntry>emptySet() : new HashSet<>(txMap.values());
+ }
+
/** {@inheritDoc} */
@Override public IgniteTxEntry entry(IgniteTxKey key) {
return txMap == null ? null : txMap.get(key);
@@ -408,7 +417,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
- @Override public void addEntry(IgniteTxEntry entry) {
+ @Override public synchronized void addEntry(IgniteTxEntry entry) {
txMap.put(entry.txKey(), entry);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
index 36843dd..70d938e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
@@ -36,8 +36,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT;
@@ -106,22 +109,30 @@ public class TxDeadlockDetection {
stack.push(txId);
while (!stack.isEmpty()) {
- GridCacheVersion v = stack.pop();
+ GridCacheVersion v = stack.peek();
+
+ if (visited.contains(v)) {
+ stack.pop();
+ inPath.remove(v);
- if (visited.contains(v))
continue;
+ }
visited.add(v);
Set<GridCacheVersion> children = wfg.get(v);
- if (children == null || children.isEmpty())
+ if (children == null || children.isEmpty()) {
+ stack.pop();
+ inPath.remove(v);
+
continue;
+ }
inPath.add(v);
for (GridCacheVersion w : children) {
- if (inPath.contains(w)) {
+ if (inPath.contains(w) && visited.contains(w)) {
List<GridCacheVersion> cycle = new ArrayList<>();
for (GridCacheVersion x = v; !x.equals(w); x = edgeTo.get(x))
@@ -158,15 +169,18 @@ public class TxDeadlockDetection {
private final Set<IgniteTxKey> keys;
/** Processed keys. */
+ @GridToStringInclude
private final Set<IgniteTxKey> processedKeys = new HashSet<>();
/** Processed nodes. */
private final Set<UUID> processedNodes = new HashSet<>();
/** Pending keys. */
+ @GridToStringInclude
private Map<UUID, Set<IgniteTxKey>> pendingKeys = new HashMap<>();
/** Nodes queue. */
+ @GridToStringInclude
private final UniqueDeque<UUID> nodesQueue = new UniqueDeque<>();
/** Preferred nodes. */
@@ -194,6 +208,7 @@ public class TxDeadlockDetection {
private int itersCnt;
/** Timeout object. */
+ @GridToStringExclude
private DeadlockTimeoutObject timeoutObj;
/** Timed out flag. */
@@ -252,8 +267,8 @@ public class TxDeadlockDetection {
if (topVer == null) // Tx manager already stopped
onDone();
-
- map(keys, Collections.<IgniteTxKey, TxLockList>emptyMap());
+ else
+ map(keys, Collections.<IgniteTxKey, TxLockList>emptyMap());
}
/**
@@ -441,14 +456,17 @@ public class TxDeadlockDetection {
* @param txLocks Tx locks.
*/
private void updateWaitForGraph(Map<IgniteTxKey, TxLockList> txLocks) {
+ if (txLocks == null || txLocks.isEmpty())
+ return;
+
for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) {
GridCacheVersion txOwner = null;
for (TxLock lock : e.getValue().txLocks()) {
- if (lock.owner()) {
- assert txOwner == null;
-
+ if (lock.owner() && txOwner == null) {
+ // Actually we can get lock list with more than one owner. In this case ignore all owners
+ // except first because likely the first owner was cause of deadlock.
txOwner = lock.txId();
if (keys.contains(e.getKey()) && !txId.equals(lock.txId())) {
@@ -463,7 +481,7 @@ public class TxDeadlockDetection {
continue;
}
- if (lock.candiate()) {
+ if (lock.candiate() || lock.owner()) {
GridCacheVersion txId0 = lock.txId();
Set<GridCacheVersion> waitForTxs = wfg.get(txId0);
@@ -485,9 +503,9 @@ public class TxDeadlockDetection {
if (res != null && set) {
if (res.classError() != null) {
- IgniteLogger log = cctx.logger(TxDeadlockDetection.class);
+ IgniteLogger log = cctx.kernalContext().log(this.getClass());
- log.warning("Failed to finish deadlock detection due to an error: " + nodeId);
+ U.warn(log, "Failed to finish deadlock detection due to an error: " + nodeId);
onDone();
}
@@ -528,6 +546,11 @@ public class TxDeadlockDetection {
return false;
}
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TxDeadlockFuture.class, this);
+ }
+
/**
* Lock request timeout object.
*/
@@ -543,6 +566,10 @@ public class TxDeadlockDetection {
@Override public void onTimeout() {
timedOut = true;
+ IgniteLogger log = cctx.kernalContext().log(this.getClass());
+
+ U.warn(log, "Deadlock detection was timed out [timeout=" + DEADLOCK_TIMEOUT + ", fut=" + this + ']');
+
onDone();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
index 94b5620..2b524e8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
@@ -18,15 +18,29 @@
package org.apache.ignite.internal.processors.cache;
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -34,11 +48,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionTimeoutException;
-import javax.cache.CacheException;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
@@ -62,6 +71,10 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+ TcpCommunicationSpi commSpi = new TestCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setAtomicityMode(atomicityMode());
@@ -87,7 +100,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
- startGrids(1);
+ startGrids(2);
}
/** {@inheritDoc} */
@@ -98,7 +111,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
/**
* Success if user tx was timed out.
*
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testUserTxTimeout() throws Exception {
final Ignite ignite = grid(0);
@@ -112,7 +125,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
/**
* Success if system caches weren't timed out.
*
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testSystemCacheTx() throws Exception {
final Ignite ignite = grid(0);
@@ -143,27 +156,23 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
* Success if implicit tx fails.
*
* @param cache Cache name.
- * @throws Exception
+ * @throws Exception If failed.
*/
protected void checkImplicitTxTimeout(final IgniteCache<Object, Object> cache) throws Exception {
- try {
- cache.invoke("key", new EntryProcessor<Object, Object, Object>() {
- @Override public Object process(final MutableEntry<Object, Object> entry, final Object... args)
- throws EntryProcessorException {
- try {
- sleepForTxFailure();
- } catch (InterruptedException e) {
- throw new EntryProcessorException(e);
- }
+ TestCommunicationSpi.delay = true;
- return null;
- }
- });
+ Integer key = primaryKey(ignite(1).cache(CACHE_NAME));
+
+ try {
+ cache.put(key, 0);
fail("Timeout exception must be thrown");
}
catch (CacheException e) {
- // OK
+ // No-op.
+ }
+ finally {
+ TestCommunicationSpi.delay = false;
}
cache.clear();
@@ -174,7 +183,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
*
* @param cache Cache name.
* @param ignite Ignite instance.
- * @throws Exception
+ * @throws Exception If failed.
*/
protected void checkExplicitTxTimeout(final IgniteCache<Object, Object> cache, final Ignite ignite)
throws Exception {
@@ -198,7 +207,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
* Success if explicit tx doesn't fail.
*
* @param cache Cache instance.
- * @throws Exception
+ * @throws Exception If failed.
*/
protected void checkStartTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception {
try (final IgniteInternalTx tx = CU.txStartInternal(cache.context(), cache, PESSIMISTIC, READ_COMMITTED)) {
@@ -220,7 +229,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
* Success if implicit tx fails.
*
* @param cache Cache instance.
- * @throws Exception
+ * @throws Exception If failed.
*/
protected void checkImplicitTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception {
cache.invoke("key", new EntryProcessor<Object, Object, Object>() {
@@ -241,9 +250,39 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
/**
* Sleep multiple {@link #TX_TIMEOUT} times.
*
- * @throws InterruptedException
+ * @throws InterruptedException If interrupted.
*/
private void sleepForTxFailure() throws InterruptedException {
Thread.sleep(TX_TIMEOUT * 3);
}
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Delay. */
+ private static volatile boolean delay;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(
+ final ClusterNode node,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC
+ ) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ Message msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridNearTxPrepareRequest && delay) {
+ try {
+ U.sleep(TX_TIMEOUT * 2);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
index c417821..8475175 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
@@ -20,10 +20,10 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -154,8 +154,8 @@ public class IgniteTxTimeoutAbstractTest extends GridCommonAbstractTest {
assert false : "Timeout never happened for transaction: " + tx;
}
- catch (CacheException e) {
- if (!(e.getCause() instanceof TransactionTimeoutException))
+ catch (Exception e) {
+ if (!(X.hasCause(e, TransactionTimeoutException.class)))
throw e;
info("Received expected timeout exception [msg=" + e.getMessage() + ", tx=" + tx + ']');
@@ -164,4 +164,4 @@ public class IgniteTxTimeoutAbstractTest extends GridCommonAbstractTest {
tx.close();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java
new file mode 100644
index 0000000..89fe8e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class CachePartitionedMultiNodeLongTxTimeoutFullApiTest extends GridCachePartitionedMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
new file mode 100644
index 0000000..3e3b84e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+
+/**
+ *
+ */
+public class CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest extends
+ GridCachePartitionedMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java
index cfa93ac..e27207d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.local;
-import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -154,7 +153,7 @@ public class GridCacheLocalTxTimeoutSelfTest extends GridCommonAbstractTest {
tx.commit();
}
- catch (CacheException e) {
+ catch (Exception e) {
assertTrue(X.hasCause(e, TransactionTimeoutException.class));
info("Received expected optimistic exception: " + e.getMessage());
@@ -166,4 +165,4 @@ public class GridCacheLocalTxTimeoutSelfTest extends GridCommonAbstractTest {
assert wasEx;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java
index 20467c2..b0a407c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java
@@ -24,10 +24,12 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import junit.framework.TestCase;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.findCycle;
@@ -95,6 +97,14 @@ public class DepthFirstSearchTest extends TestCase {
wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{
put(T1, new HashSet<GridCacheVersion>(){{add(T2);}});
+ put(T2, new HashSet<GridCacheVersion>(){{add(T3);}});
+ put(T4, new HashSet<GridCacheVersion>(){{add(T1); add(T2); add(T3);}});
+ }};
+
+ assertAllNull(wfg);
+
+ wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{
+ put(T1, new HashSet<GridCacheVersion>(){{add(T2);}});
put(T3, new HashSet<GridCacheVersion>(){{add(T4);}});
put(T4, new HashSet<GridCacheVersion>(){{add(T1);}});
}};
@@ -228,6 +238,94 @@ public class DepthFirstSearchTest extends TestCase {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testFindCycle4() throws Exception {
+ Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{
+ put(T1, Collections.singleton(T2));
+ put(T2, asLinkedHashSet(T3, T4));
+ put(T3, Collections.singleton(T4));
+ put(T4, Collections.singleton(T5));
+ put(T6, Collections.singleton(T3));
+ }};
+
+ assertNull(findCycle(wfg, T1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRandomNoExceptions() throws Exception {
+ int maxNodesCnt = 100;
+ int minNodesCnt = 10;
+ int maxWaitForNodesCnt = 20;
+
+ int cyclesFound = 0;
+ int cyclesNotFound = 0;
+
+ Random seedRnd = new Random();
+
+ Random rnd = new Random();
+
+ for (int i = 0; i < 50000; i++) {
+ long seed = seedRnd.nextLong();
+
+ rnd.setSeed(seed);
+
+ System.out.println(">>> Iteration " + i + " with seed " + seed);
+
+ int nodesCnt = rnd.nextInt(maxNodesCnt - minNodesCnt) + minNodesCnt;
+
+ Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<>();
+
+ for (int j = 0; j < nodesCnt; j++) {
+ if (rnd.nextInt(100) > 30) {
+ int waitForNodesCnt = rnd.nextInt(maxWaitForNodesCnt);
+
+ Set<GridCacheVersion> waitForNodes = null;
+
+ if (waitForNodesCnt > 0) {
+ waitForNodes = new LinkedHashSet<>();
+
+ for (int k = 0; k < waitForNodesCnt;) {
+ int n = rnd.nextInt(nodesCnt);
+
+ if (n != j) {
+ waitForNodes.add(new GridCacheVersion(n, 0, 0, 0));
+ k++;
+ }
+ }
+ }
+
+ wfg.put(new GridCacheVersion(j, 0, 0, 0), waitForNodes);
+ }
+ }
+
+ for (int j = 0; j < nodesCnt; j++) {
+ try {
+ List<GridCacheVersion> cycle = findCycle(wfg, new GridCacheVersion(j, 0, 0, 0));
+
+ if (cycle == null)
+ cyclesNotFound++;
+ else
+ cyclesFound++;
+ }
+ catch (Throwable e) {
+ U.error(null, "Error during finding cycle in graph: ", e);
+
+ U.warn(null, "Seed: " + seed);
+
+ U.warn(null, "Wait-for-graph: " + wfg);
+
+ fail();
+ }
+ }
+ }
+
+ System.out.println(">>> Test finished. Cycles found: " + cyclesFound + ", cycles not found: " + cyclesNotFound);
+ }
+
+ /**
* @param wfg Wait-for-graph.
*/
private static void assertAllNull(Map<GridCacheVersion, Set<GridCacheVersion>> wfg, GridCacheVersion... ignore) {
@@ -249,4 +347,4 @@ public class DepthFirstSearchTest extends TestCase {
return set;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
new file mode 100644
index 0000000..c9d18eb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest {
+ /** Nodes count. */
+ private static final int NODES_CNT = 3;
+
+ /** Cache. */
+ private static final String CACHE = "cache";
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ ccfg.setName(CACHE);
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setNearConfiguration(null);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGridsMultiThreaded(NODES_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ GridTestUtils.setFieldValue(null, TxDeadlockDetection.class, "DEADLOCK_TIMEOUT", (int)(getTestTimeout() * 2));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ GridTestUtils.setFieldValue(null, TxDeadlockDetection.class, "DEADLOCK_TIMEOUT",
+ getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, 60000));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60 * 1000;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoHangsPessimistic() throws Exception {
+ assertTrue(grid(0).context().cache().context().tm().deadlockDetectionEnabled());
+
+ doTest(PESSIMISTIC);
+
+ try {
+ GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS", 0);
+
+ assertFalse(grid(0).context().cache().context().tm().deadlockDetectionEnabled());
+
+ doTest(PESSIMISTIC);
+ }
+ finally {
+ GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS",
+ IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoHangsOptimistic() throws Exception {
+ assertTrue(grid(0).context().cache().context().tm().deadlockDetectionEnabled());
+
+ doTest(OPTIMISTIC);
+
+ try {
+ GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS", 0);
+
+ assertFalse(grid(0).context().cache().context().tm().deadlockDetectionEnabled());
+
+ doTest(OPTIMISTIC);
+ }
+ finally {
+ GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS",
+ IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000));
+ }
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doTest(final TransactionConcurrency concurrency) throws IgniteCheckedException {
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<Long> restartFut = null;
+
+ try {
+ restartFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ while (!stop.get()) {
+ try {
+ U.sleep(500);
+
+ startGrid(NODES_CNT);
+
+ awaitPartitionMapExchange();
+
+ U.sleep(500);
+
+ stopGrid(NODES_CNT);
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+ }
+ }
+ }, 1, "restart-thread");
+
+ long stopTime = System.currentTimeMillis() + 2 * 60_000L;
+
+ for (int i = 0; System.currentTimeMillis() < stopTime; i++) {
+ boolean detectionEnabled = grid(0).context().cache().context().tm().deadlockDetectionEnabled();
+
+ log.info(">>> Iteration " + i + " (detection is " + (detectionEnabled ? "enabled" : "disabled") + ')');
+
+ final AtomicInteger threadCnt = new AtomicInteger();
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ int threadNum = threadCnt.getAndIncrement();
+
+ Ignite ignite = ignite(threadNum % NODES_CNT);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
+
+ try (Transaction tx = ignite.transactions().txStart(concurrency, REPEATABLE_READ, 500, 0)) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 50; i++) {
+ int key = rnd.nextInt(50);
+
+ if (log.isDebugEnabled()) {
+ log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+ ", tx=" + tx + ", key=" + key + ']');
+ }
+
+ cache.put(key, 0);
+ }
+
+ tx.commit();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, NODES_CNT * 3, "tx-thread");
+
+ fut.get();
+ }
+ }
+ finally {
+ stop.set(true);
+
+ if (restartFut != null)
+ restartFut.get();
+
+ checkDetectionFutures();
+ }
+ }
+
+ /**
+ *
+ */
+ private void checkDetectionFutures() {
+ for (int i = 0; i < NODES_CNT ; i++) {
+ Ignite ignite = ignite(i);
+
+ IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
+
+ Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
+
+ assertTrue(futs.isEmpty());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
index 3d0beac..87bc70f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
@@ -21,8 +21,9 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Collection;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,7 +48,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionTimeoutException;
-import org.jsr166.ThreadLocalRandom8;
import static org.apache.ignite.internal.util.typedef.X.hasCause;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -152,7 +152,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 700, 0)) {
- ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 50; i++) {
int key = rnd.nextInt(50);
@@ -217,7 +217,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
cache.put(key, 0);
- barrier.await(timeout + 100, TimeUnit.MILLISECONDS);
+ barrier.await(timeout + 1000, TimeUnit.MILLISECONDS);
tx.commit();
}
@@ -281,7 +281,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
log.info(">>> Performs sleep. [node=" + ((IgniteKernal)ignite).localNode() +
", tx=" + tx + ']');
- U.sleep(timeout * 2);
+ U.sleep(timeout * 3);
}
else {
int key2 = threadNum + 1;
@@ -406,8 +406,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
- ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs =
- GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts");
+ Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
assertTrue(futs.isEmpty());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
new file mode 100644
index 0000000..7b40da2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstractTest {
+ /** Nodes count. */
+ private static final int NODES_CNT = 2;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (isDebug()) {
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.failureDetectionTimeoutEnabled(false);
+
+ cfg.setDiscoverySpi(discoSpi);
+ }
+
+ TcpCommunicationSpi commSpi = new TestCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ CacheConfiguration ccfg0 = defaultCacheConfiguration();
+
+ ccfg0.setName("cache0");
+ ccfg0.setCacheMode(CacheMode.PARTITIONED);
+ ccfg0.setBackups(1);
+ ccfg0.setNearConfiguration(null);
+
+ CacheConfiguration ccfg1 = defaultCacheConfiguration();
+
+ ccfg1.setName("cache1");
+ ccfg1.setCacheMode(CacheMode.PARTITIONED);
+ ccfg1.setBackups(1);
+ ccfg1.setNearConfiguration(null);
+
+ cfg.setCacheConfiguration(ccfg0, ccfg1);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODES_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeadlock() throws Exception {
+ // Sometimes boh transactions perform commit, so we repeat attempt.
+ while (!doTestDeadlock()) {}
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private boolean doTestDeadlock() throws Exception {
+ TestCommunicationSpi.init(2);
+
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ final AtomicInteger threadCnt = new AtomicInteger();
+
+ final AtomicBoolean deadlock = new AtomicBoolean();
+
+ final AtomicInteger commitCnt = new AtomicInteger();
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ int threadNum = threadCnt.getAndIncrement();
+
+ Ignite ignite = ignite(0);
+
+ IgniteCache<Integer, Integer> cache1 = ignite.cache("cache" + (threadNum == 0 ? 0 : 1));
+
+ IgniteCache<Integer, Integer> cache2 = ignite.cache("cache" + (threadNum == 0 ? 1 : 0));
+
+ try (Transaction tx =
+ ignite.transactions().txStart(OPTIMISTIC, REPEATABLE_READ, 500, 0)
+ ) {
+ int key1 = primaryKey(cache1);
+
+ log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+ ", tx=" + tx + ", key=" + key1 + ", cache=" + cache1.getName() + ']');
+
+ cache1.put(key1, 0);
+
+ barrier.await();
+
+ int key2 = primaryKey(cache2);
+
+ log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+ ", tx=" + tx + ", key=" + key2 + ", cache=" + cache2.getName() + ']');
+
+ cache2.put(key2, 1);
+
+ tx.commit();
+
+ commitCnt.incrementAndGet();
+ }
+ catch (Throwable e) {
+ // At least one stack trace should contain TransactionDeadlockException.
+ if (hasCause(e, TransactionTimeoutException.class) &&
+ hasCause(e, TransactionDeadlockException.class)
+ ) {
+ if (deadlock.compareAndSet(false, true))
+ U.error(log, "At least one stack trace should contain " +
+ TransactionDeadlockException.class.getSimpleName(), e);
+ }
+ }
+ }
+ }, 2, "tx-thread");
+
+ fut.get();
+
+ if (commitCnt.get() == 2)
+ return false;
+
+ assertTrue(deadlock.get());
+
+ for (int i = 0; i < NODES_CNT ; i++) {
+ Ignite ignite = ignite(i);
+
+ IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
+
+ Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
+
+ assertTrue(futs.isEmpty());
+ }
+
+ return true;
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Tx count. */
+ private static volatile int TX_CNT;
+
+ /** Tx ids. */
+ private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>();
+
+ /**
+ * @param txCnt Tx count.
+ */
+ private static void init(int txCnt) {
+ TX_CNT = txCnt;
+ TX_IDS.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(
+ final ClusterNode node,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC
+ ) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ Message msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridNearTxPrepareRequest) {
+ final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0;
+
+ GridCacheVersion txId = req.version();
+
+ if (TX_IDS.contains(txId)) {
+ while (TX_IDS.size() < TX_CNT) {
+ try {
+ U.sleep(50);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ else if (msg0 instanceof GridNearTxPrepareResponse) {
+ GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0;
+
+ GridCacheVersion txId = res.version();
+
+ TX_IDS.add(txId);
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
new file mode 100644
index 0000000..aa240aa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.util.typedef.X.cause;
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** Nodes count (actually two times more nodes will started: server + client). */
+ private static final int NODES_CNT = 4;
+
+ /** No op transformer. */
+ private static final NoOpTransformer NO_OP_TRANSFORMER = new NoOpTransformer();
+
+ /** Wrapping transformer. */
+ private static final WrappingTransformer WRAPPING_TRANSFORMER = new WrappingTransformer();
+
+ /** Client mode flag. */
+ private static boolean client;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (isDebug()) {
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.failureDetectionTimeoutEnabled(false);
+
+ cfg.setDiscoverySpi(discoSpi);
+ }
+
+ TcpCommunicationSpi commSpi = new TestCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ client = false;
+
+ startGrids(NODES_CNT);
+
+ client = true;
+
+ for (int i = 0; i < NODES_CNT; i++)
+ startGrid(i + NODES_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeadlocksPartitioned() throws Exception {
+ for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+ doTestDeadlocks(createCache(PARTITIONED, syncMode, false), NO_OP_TRANSFORMER);
+ doTestDeadlocks(createCache(PARTITIONED, syncMode, false), WRAPPING_TRANSFORMER);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeadlocksPartitionedNear() throws Exception {
+ for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+ doTestDeadlocks(createCache(PARTITIONED, syncMode, true), NO_OP_TRANSFORMER);
+ doTestDeadlocks(createCache(PARTITIONED, syncMode, true), WRAPPING_TRANSFORMER);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeadlocksReplicated() throws Exception {
+ for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+ doTestDeadlocks(createCache(REPLICATED, syncMode, false), NO_OP_TRANSFORMER);
+ doTestDeadlocks(createCache(REPLICATED, syncMode, false), WRAPPING_TRANSFORMER);
+ }
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param syncMode Write sync mode.
+ * @param near Near.
+ * @return Created cache.
+ */
+ @SuppressWarnings("unchecked")
+ private IgniteCache createCache(CacheMode cacheMode, CacheWriteSynchronizationMode syncMode, boolean near) {
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setBackups(1);
+ ccfg.setNearConfiguration(near ? new NearCacheConfiguration() : null);
+ ccfg.setWriteSynchronizationMode(syncMode);
+
+ IgniteCache cache = ignite(0).createCache(ccfg);
+
+ if (near) {
+ for (int i = 0; i < NODES_CNT; i++) {
+ Ignite client = ignite(i + NODES_CNT);
+
+ assertTrue(client.configuration().isClientMode());
+
+ client.createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+ }
+ }
+
+ return cache;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param transformer Transformer closure.
+ * @throws Exception If failed.
+ */
+ private void doTestDeadlocks(IgniteCache cache, IgniteClosure<Integer, Object> transformer) throws Exception {
+ try {
+ awaitPartitionMapExchange();
+
+ doTestDeadlock(3, false, true, true, transformer);
+ doTestDeadlock(3, false, false, false, transformer);
+ doTestDeadlock(3, false, false, true, transformer);
+
+ doTestDeadlock(4, false, true, true, transformer);
+ doTestDeadlock(4, false, false, false, transformer);
+ doTestDeadlock(4, false, false, true, transformer);
+ }
+ catch (Throwable e) {
+ U.error(log, "Unexpected exception: ", e);
+
+ fail();
+ }
+ finally {
+ if (cache != null)
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTestDeadlock(
+ final int txCnt,
+ final boolean loc,
+ boolean lockPrimaryFirst,
+ final boolean clientTx,
+ final IgniteClosure<Integer, Object> transformer
+ ) throws Exception {
+ log.info(">>> Test deadlock [txCnt=" + txCnt + ", loc=" + loc + ", lockPrimaryFirst=" + lockPrimaryFirst +
+ ", clientTx=" + clientTx + ", transformer=" + transformer.getClass().getName() + ']');
+
+ TestCommunicationSpi.init(txCnt);
+
+ final AtomicInteger threadCnt = new AtomicInteger();
+
+ final CyclicBarrier barrier = new CyclicBarrier(txCnt);
+
+ final AtomicReference<TransactionDeadlockException> deadlockErr = new AtomicReference<>();
+
+ final List<List<Integer>> keySets = generateKeys(txCnt, loc, !lockPrimaryFirst);
+
+ final Set<Integer> involvedKeys = new GridConcurrentHashSet<>();
+ final Set<Integer> involvedLockedKeys = new GridConcurrentHashSet<>();
+ final Set<IgniteInternalTx> involvedTxs = new GridConcurrentHashSet<>();
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ int threadNum = threadCnt.incrementAndGet();
+
+ Ignite ignite = loc ? ignite(0) : ignite(clientTx ? threadNum - 1 + txCnt : threadNum - 1);
+
+ IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME);
+
+ List<Integer> keys = keySets.get(threadNum - 1);
+
+ int txTimeout = 500 + txCnt * 100;
+
+ try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, REPEATABLE_READ, txTimeout, 0)) {
+ IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx();
+
+ involvedTxs.add(tx0);
+
+ Integer key = keys.get(0);
+
+ involvedKeys.add(key);
+
+ Object k;
+
+ log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+ ", tx=" + tx + ", key=" + transformer.apply(key) + ']');
+
+ cache.put(transformer.apply(key), 0);
+
+ involvedLockedKeys.add(key);
+
+ barrier.await();
+
+ key = keys.get(1);
+
+ ClusterNode primaryNode =
+ ((IgniteCacheProxy)cache).context().affinity().primary(key, NONE);
+
+ List<Integer> primaryKeys =
+ primaryKeys(grid(primaryNode).cache(CACHE_NAME), 5, key + (100 * threadNum));
+
+ Map<Object, Integer> entries = new HashMap<>();
+
+ involvedKeys.add(key);
+
+ entries.put(transformer.apply(key), 0);
+
+ for (Integer i : primaryKeys) {
+ involvedKeys.add(i);
+
+ entries.put(transformer.apply(i), 1);
+
+ k = transformer.apply(i + 13);
+
+ involvedKeys.add(i + 13);
+
+ entries.put(k, 2);
+ }
+
+ log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+ ", tx=" + tx + ", entries=" + entries + ']');
+
+ cache.putAll(entries);
+
+ tx.commit();
+ }
+ catch (Throwable e) {
+ U.error(log, "Expected exception: ", e);
+
+ // At least one stack trace should contain TransactionDeadlockException.
+ if (hasCause(e, TransactionTimeoutException.class) &&
+ hasCause(e, TransactionDeadlockException.class)
+ ) {
+ if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class)))
+ U.error(log, "At least one stack trace should contain " +
+ TransactionDeadlockException.class.getSimpleName(), e);
+ }
+ }
+ }
+ }, loc ? 2 : txCnt, "tx-thread");
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(null, "Unexpected exception", e);
+
+ fail();
+ }
+
+ U.sleep(1000);
+
+ TransactionDeadlockException deadlockE = deadlockErr.get();
+
+ assertNotNull(deadlockE);
+
+ boolean fail = false;
+
+ // Check transactions, futures and entry locks state.
+ for (int i = 0; i < NODES_CNT * 2; i++) {
+ Ignite ignite = ignite(i);
+
+ int cacheId = ((IgniteCacheProxy)ignite.cache(CACHE_NAME)).context().cacheId();
+
+ GridCacheSharedContext<Object, Object> cctx = ((IgniteKernal)ignite).context().cache().context();
+
+ IgniteTxManager txMgr = cctx.tm();
+
+ Collection<IgniteInternalTx> activeTxs = txMgr.activeTransactions();
+
+ for (IgniteInternalTx tx : activeTxs) {
+ Collection<IgniteTxEntry> entries = tx.allEntries();
+
+ for (IgniteTxEntry entry : entries) {
+ if (entry.cacheId() == cacheId) {
+ fail = true;
+
+ U.error(log, "Transaction still exists: " + "\n" + tx.xidVersion() +
+ "\n" + tx.nearXidVersion() + "\n nodeId=" + cctx.localNodeId() + "\n tx=" + tx);
+ }
+ }
+ }
+
+ Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
+
+ assertTrue(futs.isEmpty());
+
+ GridCacheAdapter<Object, Integer> intCache = internalCache(i, CACHE_NAME);
+
+ GridCacheConcurrentMap map = intCache.map();
+
+ for (Integer key : involvedKeys) {
+ Object key0 = transformer.apply(key);
+
+ KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key0);
+
+ GridCacheMapEntry entry = map.getEntry(keyCacheObj);
+
+ if (entry != null)
+ assertNull("Entry still has locks " + entry, entry.mvccAllLocal());
+ }
+ }
+
+ if (fail)
+ fail("Some transactions still exist");
+
+ // Check deadlock report
+ String msg = deadlockE.getMessage();
+
+ for (IgniteInternalTx tx : involvedTxs)
+ assertTrue(msg.contains(
+ "[txId=" + tx.xidVersion() + ", nodeId=" + tx.nodeId() + ", threadId=" + tx.threadId() + ']'));
+
+ for (Integer key : involvedKeys) {
+ if (involvedLockedKeys.contains(key))
+ assertTrue(msg.contains("[key=" + transformer.apply(key) + ", cache=" + CACHE_NAME + ']'));
+ else
+ assertFalse(msg.contains("[key=" + transformer.apply(key)));
+ }
+ }
+
+ /**
+ * @param nodesCnt Nodes count.
+ * @param loc Local cache.
+ */
+ private List<List<Integer>> generateKeys(int nodesCnt, boolean loc, boolean reverse) throws IgniteCheckedException {
+ List<List<Integer>> keySets = new ArrayList<>();
+
+ if (loc) {
+ List<Integer> keys = primaryKeys(ignite(0).cache(CACHE_NAME), 2);
+
+ keySets.add(new ArrayList<>(keys));
+
+ Collections.reverse(keys);
+
+ keySets.add(keys);
+ }
+ else {
+ for (int i = 0; i < nodesCnt; i++) {
+ List<Integer> keys = new ArrayList<>(2);
+
+ int n1 = i + 1;
+ int n2 = n1 + 1;
+
+ int i1 = n1 < nodesCnt ? n1 : n1 - nodesCnt;
+ int i2 = n2 < nodesCnt ? n2 : n2 - nodesCnt;
+
+ keys.add(primaryKey(ignite(i1).cache(CACHE_NAME)));
+ keys.add(primaryKey(ignite(i2).cache(CACHE_NAME)));
+
+ if (reverse)
+ Collections.reverse(keys);
+
+ keySets.add(keys);
+ }
+ }
+
+ return keySets;
+ }
+
+ /**
+ *
+ */
+ private static class NoOpTransformer implements IgniteClosure<Integer, Object> {
+ /** {@inheritDoc} */
+ @Override public Object apply(Integer val) {
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class WrappingTransformer implements IgniteClosure<Integer, Object> {
+ /** {@inheritDoc} */
+ @Override public Object apply(Integer val) {
+ return new KeyObject(val);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class KeyObject implements Serializable {
+ /** Id. */
+ private int id;
+
+ /** Name. */
+ private String name;
+
+ /**
+ * @param id Id.
+ */
+ public KeyObject(int id) {
+ this.id = id;
+ this.name = "KeyObject" + id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "KeyObject{" +
+ "id=" + id +
+ ", name='" + name + '\'' +
+ '}';
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ KeyObject obj = (KeyObject)o;
+
+ return id == obj.id && name.equals(obj.name);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Tx count. */
+ private static volatile int TX_CNT;
+
+ /** Tx ids. */
+ private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>();
+
+ /**
+ * @param txCnt Tx count.
+ */
+ private static void init(int txCnt) {
+ TX_CNT = txCnt;
+ TX_IDS.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(
+ final ClusterNode node,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC
+ ) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ Message msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridNearTxPrepareRequest) {
+ final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0;
+
+ GridCacheVersion txId = req.version();
+
+ if (TX_IDS.contains(txId)) {
+ while (TX_IDS.size() < TX_CNT) {
+ try {
+ U.sleep(50);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ else if (msg0 instanceof GridNearTxPrepareResponse) {
+ GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0;
+
+ GridCacheVersion txId = res.version();
+
+ TX_IDS.add(txId);
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+}