You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/31 06:33:11 UTC
[21/38] ignite git commit: ignite-2968 Deadlock detection for
optimistic tx and near caches
ignite-2968 Deadlock detection for optimistic tx and near caches
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0465874d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0465874d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0465874d
Branch: refs/heads/ignite-3443
Commit: 0465874d9dddcf962a82a2ef38589121201f0b75
Parents: 2891703
Author: agura <ag...@gridgain.com>
Authored: Wed Aug 24 21:13:29 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Mon Aug 29 16:01:16 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 19 +-
.../GridCachePartitionExchangeManager.java | 7 +
.../GridDistributedTxPrepareRequest.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 53 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 4 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 26 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 109 +++-
.../dht/GridDhtTxPrepareRequest.java | 4 +-
.../colocated/GridDhtColocatedLockFuture.java | 37 +-
.../distributed/near/GridNearLockFuture.java | 90 ++-
...arOptimisticSerializableTxPrepareFuture.java | 13 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 263 ++++++---
...ridNearOptimisticTxPrepareFutureAdapter.java | 5 +-
.../GridNearPessimisticTxPrepareFuture.java | 8 +-
.../near/GridNearTxFinishFuture.java | 5 +-
.../cache/distributed/near/GridNearTxLocal.java | 16 +-
.../near/GridNearTxPrepareRequest.java | 4 +-
.../cache/transactions/IgniteInternalTx.java | 3 +-
.../cache/transactions/IgniteTxAdapter.java | 37 +-
.../cache/transactions/IgniteTxHandler.java | 9 +-
.../transactions/IgniteTxLocalAdapter.java | 19 +-
.../cache/transactions/IgniteTxManager.java | 86 ++-
.../cache/transactions/IgniteTxStateImpl.java | 11 +-
.../cache/transactions/TxDeadlockDetection.java | 51 +-
.../cache/IgniteTxConfigCacheSelfTest.java | 91 ++-
.../IgniteTxTimeoutAbstractTest.java | 8 +-
...tionedMultiNodeLongTxTimeoutFullApiTest.java | 34 ++
...nabledMultiNodeLongTxTimeoutFullApiTest.java | 41 ++
.../local/GridCacheLocalTxTimeoutSelfTest.java | 5 +-
.../transactions/DepthFirstSearchTest.java | 100 +++-
.../TxDeadlockDetectionNoHangsTest.java | 246 ++++++++
.../transactions/TxDeadlockDetectionTest.java | 13 +-
...timisticDeadlockDetectionCrossCacheTest.java | 257 +++++++++
.../TxOptimisticDeadlockDetectionTest.java | 574 +++++++++++++++++++
...simisticDeadlockDetectionCrossCacheTest.java | 165 ++++--
.../TxPessimisticDeadlockDetectionTest.java | 50 +-
.../IgniteCacheFullApiSelfTestSuite.java | 4 +
.../TxDeadlockDetectionTestSuite.java | 6 +
.../commands/cache/VisorCacheStopCommand.scala | 5 +-
39 files changed, 2127 insertions(+), 355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 57fa68e..f692bf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -4493,17 +4493,30 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
- * @return All MVCC local candidates.
+ * @return All MVCC local and non near candidates.
*/
+ @SuppressWarnings("ForLoopReplaceableByForEach")
@Nullable public synchronized List<GridCacheMvccCandidate> mvccAllLocal() {
GridCacheMvcc mvcc = extras != null ? extras.mvcc() : null;
if (mvcc == null)
return null;
- List<GridCacheMvccCandidate> locs = mvcc.allLocal();
+ List<GridCacheMvccCandidate> allLocs = mvcc.allLocal();
- return (locs == null || locs.isEmpty()) ? null : new ArrayList<>(locs);
+ if (allLocs == null || allLocs.isEmpty())
+ return null;
+
+ List<GridCacheMvccCandidate> locs = new ArrayList<>(allLocs.size());
+
+ for (int i = 0; i < allLocs.size(); i++) {
+ GridCacheMvccCandidate loc = allLocs.get(i);
+
+ if (!loc.nearLocal())
+ locs.add(loc);
+ }
+
+ return locs.isEmpty() ? null : locs;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e6ab046..4eb61e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1302,6 +1302,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (GridCacheFuture<?> fut : mvcc.atomicFutures())
U.warn(log, ">>> " + fut);
+
+ if (tm != null) {
+ U.warn(log, "Pending transaction deadlock detection futures:");
+
+ for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures())
+ U.warn(log, ">>> " + fut);
+ }
}
for (GridCacheContext ctx : cctx.cacheContexts()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 72e68db..c691374 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -154,6 +154,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/**
* @param tx Cache transaction.
+ * @param timeout Transactions timeout.
* @param reads Read entries.
* @param writes Write entries.
* @param txNodes Transaction nodes mapping.
@@ -162,6 +163,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
*/
public GridDistributedTxPrepareRequest(
IgniteInternalTx tx,
+ long timeout,
@Nullable Collection<IgniteTxEntry> reads,
Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes,
@@ -174,12 +176,12 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
threadId = tx.threadId();
concurrency = tx.concurrency();
isolation = tx.isolation();
- timeout = tx.timeout();
invalidate = tx.isInvalidate();
txSize = tx.size();
sys = tx.system();
plc = tx.ioPolicy();
+ this.timeout = timeout;
this.reads = reads;
this.writes = writes;
this.txNodes = txNodes;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 64b8745..b005b29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -242,12 +242,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
msgLog = cctx.shared().txLockMessageLogger();
log = U.logger(cctx.kernalContext(), logRef, GridDhtLockFuture.class);
}
-
- if (timeout > 0) {
- timeoutObj = new LockTimeoutObject();
-
- cctx.time().addTimeoutObject(timeoutObj);
- }
}
/** {@inheritDoc} */
@@ -298,8 +292,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/**
* @return Entries.
*/
- public synchronized Collection<GridDhtCacheEntry> entriesCopy() {
- return new ArrayList<>(entries());
+ public Collection<GridDhtCacheEntry> entriesCopy() {
+ synchronized (futs) {
+ return new ArrayList<>(entries());
+ }
}
/**
@@ -412,7 +408,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
return null;
}
- synchronized (this) {
+ synchronized (futs) {
entries.add(c == null || c.reentry() ? null : entry);
if (c != null && !c.reentry())
@@ -614,7 +610,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @param t Error.
*/
public void onError(Throwable t) {
- synchronized (this) {
+ synchronized (futs) {
if (err != null)
return;
@@ -654,15 +650,16 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @param entry Entry whose lock ownership changed.
*/
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
- if (isDone())
+ if (isDone() || (inTx() && tx.remainingTime() == -1))
return false; // Check other futures.
if (log.isDebugEnabled())
log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]");
if (owner != null && owner.version().equals(lockVer)) {
- synchronized (this) {
- pendingLocks.remove(entry.key());
+ synchronized (futs) {
+ if (!pendingLocks.remove(entry.key()))
+ return false;
}
if (checkLocks())
@@ -677,8 +674,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/**
* @return {@code True} if locks have been acquired.
*/
- private synchronized boolean checkLocks() {
- return pendingLocks.isEmpty();
+ private boolean checkLocks() {
+ synchronized (futs) {
+ return pendingLocks.isEmpty();
+ }
}
/** {@inheritDoc} */
@@ -709,7 +708,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (isDone() || (err == null && success && !checkLocks()))
return false;
- synchronized (this) {
+ synchronized (futs) {
if (this.err == null)
this.err = err;
}
@@ -776,13 +775,19 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
}
readyLocks();
+
+ if (timeout > 0) {
+ timeoutObj = new LockTimeoutObject();
+
+ cctx.time().addTimeoutObject(timeoutObj);
+ }
}
/**
* @param entries Entries.
*/
private void map(Iterable<GridDhtCacheEntry> entries) {
- synchronized (this) {
+ synchronized (futs) {
if (mapped)
return;
@@ -842,6 +847,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (log.isDebugEnabled())
log.debug("Mapped DHT lock future [dhtMap=" + F.nodeIds(dhtMap.keySet()) + ", dhtLockFut=" + this + ']');
+ long timeout = inTx() ? tx.remainingTime() : this.timeout;
+
// Create mini futures.
for (Map.Entry<ClusterNode, List<GridDhtCacheEntry>> mapped : dhtMap.entrySet()) {
ClusterNode n = mapped.getKey();
@@ -853,6 +860,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (cnt > 0) {
assert !n.id().equals(cctx.localNodeId());
+ if (inTx() && tx.remainingTime() == -1)
+ return;
+
MiniFuture fut = new MiniFuture(n, dhtMapping);
GridDhtLockRequest req = new GridDhtLockRequest(
@@ -1109,7 +1119,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (log.isDebugEnabled())
log.debug("Timed out waiting for lock response: " + this);
- timedOut = true;
+ synchronized (futs) {
+ timedOut = true;
+
+ // Stop locks and responses processing.
+ pendingLocks.clear();
+
+ futs.clear();
+ }
boolean releaseLocks = !(inTx() && cctx.tm().deadlockDetectionEnabled());
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 4ece775..d2e26b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -133,6 +133,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public boolean onNodeLeft(UUID nodeId) {
for (IgniteInternalFuture<?> fut : futures())
if (isMini(fut)) {
@@ -391,8 +392,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
* @param nearMap Near map.
* @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for.
*/
- private boolean finish(Map<UUID, GridDistributedTxMapping> dhtMap,
- Map<UUID, GridDistributedTxMapping> nearMap) {
+ private boolean finish(Map<UUID, GridDistributedTxMapping> dhtMap, Map<UUID, GridDistributedTxMapping> nearMap) {
if (tx.onePhaseCommit())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index c9d4345..b659abb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -314,6 +314,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
true);
}
+ long timeout = remainingTime();
+
// For pessimistic mode we don't distribute prepare request.
GridDhtTxPrepareFuture fut = prepFut;
@@ -322,11 +324,16 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture(
cctx,
this,
+ timeout,
nearMiniId,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
true,
- needReturnValue())))
+ needReturnValue()))) {
+ if (timeout == -1)
+ prepFut.onError(timeoutException());
+
return prepFut;
+ }
}
else
// Prepare was called explicitly.
@@ -334,15 +341,16 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (!state(PREPARING)) {
if (setRollbackOnly()) {
- if (timedOut())
- fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + this));
+ if (timeout == -1)
+ fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " +
+ this));
else
fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() +
", tx=" + this + ']'));
}
else
- fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" + state()
- + ", tx=" + this + ']'));
+ fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" +
+ state() + ", tx=" + this + ']'));
return fut;
}
@@ -394,6 +402,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
// In optimistic mode prepare still can be called explicitly from salvageTx.
GridDhtTxPrepareFuture fut = prepFut;
+ long timeout = remainingTime();
+
if (fut == null) {
init();
@@ -401,6 +411,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture(
cctx,
this,
+ timeout,
nearMiniId,
verMap,
last,
@@ -410,6 +421,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
"[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
+ if (timeout == -1)
+ f.onError(timeoutException());
+
return chainOnePhasePrepare(f);
}
}
@@ -427,7 +441,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
fut.complete();
if (setRollbackOnly()) {
- if (timedOut())
+ if (timeout == -1)
fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " +
this));
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e9805aa..1bdd9b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -59,8 +59,10 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -204,9 +206,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** */
private boolean invoke;
+ /** Timeout object. */
+ private final PrepareTimeoutObject timeoutObj;
+
/**
* @param cctx Context.
* @param tx Transaction.
+ * @param timeout Timeout.
* @param nearMiniId Near mini future id.
* @param dhtVerMap DHT versions map.
* @param last {@code True} if this is last prepare operation for node.
@@ -215,6 +221,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
public GridDhtTxPrepareFuture(
GridCacheSharedContext cctx,
final GridDhtTxLocalAdapter tx,
+ long timeout,
IgniteUuid nearMiniId,
Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
boolean last,
@@ -243,6 +250,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert dhtMap != null;
assert nearMap != null;
+
+ timeoutObj = timeout > 0 ? new PrepareTimeoutObject(timeout) : null;
}
/** {@inheritDoc} */
@@ -269,7 +278,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean rmv;
- synchronized (lockKeys) {
+ synchronized (futs) {
rmv = lockKeys.remove(entry.txKey());
}
@@ -300,7 +309,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (!locksReady)
return false;
- synchronized (lockKeys) {
+ synchronized (futs) {
return lockKeys.isEmpty();
}
}
@@ -483,32 +492,28 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param res Result.
*/
public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) {
- if (!isDone()) {
- boolean found = false;
-
- MiniFuture mini = miniFuture(res.miniId());
+ if (isDone()) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, response for finished future [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + nodeId +
+ ", res=" + res +
+ ", fut=" + this + ']');
+ }
- if (mini != null) {
- found = true;
+ return;
+ }
- assert mini.node().id().equals(nodeId);
+ MiniFuture mini = miniFuture(res.miniId());
- mini.onResult(res);
- }
+ if (mini != null) {
+ assert mini.node().id().equals(nodeId);
- if (!found) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + nodeId +
- ", res=" + res +
- ", fut=" + this + ']');
- }
- }
+ mini.onResult(res);
}
else {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, response for finished future [txId=" + tx.nearXidVersion() +
+ msgLog.debug("DHT prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
", node=" + nodeId +
", res=" + res +
@@ -525,8 +530,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) {
- // We iterate directly over the futs collection here to avoid copy.
synchronized (futs) {
+ // We iterate directly over the futs collection here to avoid copy.
// Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) {
IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i);
@@ -543,9 +548,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return null;
}
}
- }
- return null;
+ return null;
+ }
}
/**
@@ -583,7 +588,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
if (tx.optimistic() && txEntry.explicitVersion() == null) {
- synchronized (lockKeys) {
+ synchronized (futs) {
lockKeys.add(txEntry.txKey());
}
}
@@ -934,6 +939,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
// Don't forget to clean up.
cctx.mvcc().removeMvccFuture(this);
+ if (timeoutObj != null)
+ cctx.time().removeTimeoutObject(timeoutObj);
+
return true;
}
@@ -989,6 +997,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
readyLocks();
+ if (timeoutObj != null) {
+ // Start timeout tracking after 'readyLocks' to avoid race with timeout processing.
+ cctx.time().addTimeoutObject(timeoutObj);
+ }
+
mapIfLocked();
}
@@ -1158,6 +1171,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (last) {
assert tx.transactionNodes() != null;
+ final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
+
// Create mini futures.
for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
assert !dhtMapping.empty();
@@ -1175,6 +1190,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
continue;
+ if (tx.remainingTime() == -1)
+ return;
+
MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
add(fut); // Append new future.
@@ -1186,6 +1204,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
fut.futureId(),
tx.topologyVersion(),
tx,
+ timeout,
dhtWrites,
nearWrites,
txNodes,
@@ -1284,15 +1303,19 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
if (!tx.dhtMap().containsKey(nearMapping.node().id())) {
+ if (tx.remainingTime() == -1)
+ return;
+
MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping);
- add(fut); // Append new future.
+ add(fut);
GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
futId,
fut.futureId(),
tx.topologyVersion(),
tx,
+ timeout,
null,
nearMapping.writes(),
tx.transactionNodes(),
@@ -1719,4 +1742,38 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
+
+ /**
+ *
+ */
+ private class PrepareTimeoutObject extends GridTimeoutObjectAdapter {
+ /** */
+ private final long timeout;
+
+ /**
+ * @param timeout Timeout.
+ */
+ PrepareTimeoutObject(long timeout) {
+ super(timeout);
+
+ this.timeout = timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ synchronized (futs) {
+ futs.clear();
+
+ lockKeys.clear();
+ }
+
+ onError(new IgniteTxTimeoutCheckedException("Failed to acquire lock within " +
+ "provided timeout for transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']'));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PrepareTimeoutObject.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index d31ecba..1cdc96f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -112,6 +112,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param miniId Mini future ID.
* @param topVer Topology version.
* @param tx Transaction.
+ * @param timeout Transaction timeout.
* @param dhtWrites DHT writes.
* @param nearWrites Near writes.
* @param txNodes Transaction nodes mapping.
@@ -124,6 +125,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
IgniteUuid miniId,
AffinityTopologyVersion topVer,
GridDhtTxLocalAdapter tx,
+ long timeout,
Collection<IgniteTxEntry> dhtWrites,
Collection<IgniteTxEntry> nearWrites,
Map<UUID, Collection<UUID>> txNodes,
@@ -133,7 +135,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
UUID subjId,
int taskNameHash,
boolean addDepInfo) {
- super(tx, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo);
+ super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo);
assert futId != null;
assert miniId != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index f77efee..b0eea01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -443,23 +443,33 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/**
* @return Keys for which locks requested from remote nodes but response isn't received.
*/
- public Set<KeyCacheObject> requestedKeys() {
- Set<KeyCacheObject> requestedKeys = null;
+ public Set<IgniteTxKey> requestedKeys() {
+ synchronized (futs) {
+ if (timeoutObj != null && timeoutObj.requestedKeys != null)
+ return timeoutObj.requestedKeys;
+
+ return requestedKeys0();
+ }
+ }
+ /**
+ * @return Keys for which locks requested from remote nodes but response isn't received.
+ */
+ private Set<IgniteTxKey> requestedKeys0() {
for (IgniteInternalFuture<Boolean> miniFut : futures()) {
if (isMini(miniFut) && !miniFut.isDone()) {
- if (requestedKeys == null)
- requestedKeys = new HashSet<>();
-
MiniFuture mini = (MiniFuture)miniFut;
- requestedKeys.addAll(mini.keys);
+ Set<IgniteTxKey> requestedKeys = U.newHashSet(mini.keys.size());
+
+ for (KeyCacheObject key : mini.keys)
+ requestedKeys.add(new IgniteTxKey(key, cctx.cacheId()));
return requestedKeys;
}
}
- return requestedKeys;
+ return null;
}
/**
@@ -1312,12 +1322,21 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
super(timeout);
}
+ /** Requested keys. */
+ private Set<IgniteTxKey> requestedKeys;
+
/** {@inheritDoc} */
@Override public void onTimeout() {
if (log.isDebugEnabled())
log.debug("Timed out waiting for lock response: " + this);
if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
+ synchronized (futs) {
+ requestedKeys = requestedKeys0();
+
+ futs.clear(); // Stop response processing.
+ }
+
Set<IgniteTxKey> keys = new HashSet<>();
for (IgniteTxEntry txEntry : tx.allEntries()) {
@@ -1434,7 +1453,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
tx.removeMapping(node.id());
// Primary node left the grid, so fail the future.
- GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id()));
+ GridDhtColocatedLockFuture.this.onDone(false, newTopologyException(e, node.id()));
onDone(true);
}
@@ -1494,7 +1513,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
else
remap();
}
- else {
+ else {
int i = 0;
for (KeyCacheObject k : keys) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 4b6448b..3d9b6ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
@@ -48,8 +50,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -63,7 +67,9 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -481,6 +487,38 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
/**
+ * @return Keys for which locks requested from remote nodes but response isn't received.
+ */
+ public Set<IgniteTxKey> requestedKeys() {
+ synchronized (futs) {
+ if (timeoutObj != null && timeoutObj.requestedKeys != null)
+ return timeoutObj.requestedKeys;
+
+ return requestedKeys0();
+ }
+ }
+
+ /**
+ * @return Keys for which locks requested from remote nodes but response isn't received.
+ */
+ private Set<IgniteTxKey> requestedKeys0() {
+ for (IgniteInternalFuture<Boolean> miniFut : futures()) {
+ if (isMini(miniFut) && !miniFut.isDone()) {
+ MiniFuture mini = (MiniFuture)miniFut;
+
+ Set<IgniteTxKey> requestedKeys = U.newHashSet(mini.keys.size());
+
+ for (KeyCacheObject key : mini.keys)
+ requestedKeys.add(new IgniteTxKey(key, cctx.cacheId()));
+
+ return requestedKeys;
+ }
+ }
+
+ return null;
+ }
+
+ /**
* Finds pending mini future by the given mini ID.
*
* @param miniId Mini ID to find.
@@ -621,6 +659,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
if (log.isDebugEnabled())
log.debug("Received onDone(..) callback [success=" + success + ", err=" + err + ", fut=" + this + ']');
+ if (inTx() && cctx.tm().deadlockDetectionEnabled() &&
+ (this.err instanceof IgniteTxTimeoutCheckedException || timedOut))
+ return false;
+
// If locks were not acquired yet, delay completion.
if (isDone() || (err == null && success && !checkLocks()))
return false;
@@ -727,7 +769,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
topVer = tx.topologyVersionSnapshot();
if (topVer != null) {
- for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
+ for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) {
if (fut.topologyVersion().equals(topVer)){
Throwable err = fut.validateCache(cctx);
@@ -1373,6 +1415,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
super(timeout);
}
+ /** Requested keys. */
+ private Set<IgniteTxKey> requestedKeys;
+
/** {@inheritDoc} */
@SuppressWarnings({"ThrowableInstanceNeverThrown"})
@Override public void onTimeout() {
@@ -1381,7 +1426,42 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
timedOut = true;
- onComplete(false, true);
+ if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
+ synchronized (futs) {
+ requestedKeys = requestedKeys0();
+
+ futs.clear(); // Stop response processing.
+ }
+
+ Set<IgniteTxKey> keys = new HashSet<>();
+
+ for (IgniteTxEntry txEntry : tx.allEntries()) {
+ if (!txEntry.locked())
+ keys.add(txEntry.txKey());
+ }
+
+ IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
+
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
+ @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
+ try {
+ TxDeadlock deadlock = fut.get();
+
+ if (deadlock != null)
+ err = new TransactionDeadlockException(deadlock.toString(cctx.shared()));
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+
+ U.warn(log, "Failed to detect deadlock.", e);
+ }
+
+ onComplete(false, true);
+ }
+ });
+ }
+ else
+ onComplete(false, true);
}
/** {@inheritDoc} */
@@ -1466,7 +1546,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
tx.removeMapping(node.id());
// Primary node left the grid, so fail the future.
- GridNearLockFuture.this.onDone(newTopologyException(e, node.id()));
+ GridNearLockFuture.this.onDone(false, newTopologyException(e, node.id()));
onDone(true);
}
@@ -1483,6 +1563,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
if (res.error() != null) {
+ if (inTx() && cctx.tm().deadlockDetectionEnabled() &&
+ (res.error() instanceof IgniteTxTimeoutCheckedException || tx.remainingTime() == -1))
+ return;
+
if (log.isDebugEnabled())
log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
", res=" + res + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 6515140..d251528 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -184,7 +184,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
}
- if (e instanceof IgniteTxOptimisticCheckedException) {
+ if (e instanceof IgniteTxOptimisticCheckedException || e instanceof IgniteTxTimeoutCheckedException) {
if (m != null)
tx.removeMapping(m.node().id());
}
@@ -424,10 +424,21 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
final ClusterNode n = m.node();
+ long timeout = tx.remainingTime();
+
+ if (timeout == -1) {
+ IgniteCheckedException err = tx.timeoutException();
+
+ fut.onResult(err);
+
+ return err;
+ }
+
GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
futId,
tx.topologyVersion(),
tx,
+ timeout,
m.reads(),
m.writes(),
m.near(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 1ea99c4..5a300ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -41,8 +43,11 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -53,7 +58,9 @@ import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
@@ -73,8 +80,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param cctx Context.
* @param tx Transaction.
*/
- public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx,
- GridNearTxLocal tx) {
+ public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
super(cctx, tx);
assert tx.optimistic() && !tx.serializable() : tx;
@@ -85,7 +91,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (log.isDebugEnabled())
log.debug("Transaction future received owner changed callback: " + entry);
- if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) {
+ if (tx.remainingTime() == -1)
+ return false;
+
+ if ((entry.context().isNear() || entry.context().isLocal()) &&
+ owner != null && tx.hasWriteKey(entry.txKey())) {
if (keyLockFut != null)
keyLockFut.onKeyLocked(entry.txKey());
@@ -124,6 +134,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param discoThread {@code True} if executed from discovery thread.
*/
void onError(Throwable e, boolean discoThread) {
+ if (e instanceof IgniteTxTimeoutCheckedException) {
+ onTimeout();
+
+ return;
+ }
+
if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
if (tx.onePhaseCommit()) {
tx.markForBackupCheck();
@@ -160,7 +176,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (mini != null) {
assert mini.node().id().equals(nodeId);
- mini.onResult(nodeId, res);
+ mini.onResult(res);
}
else {
if (msgLog.isDebugEnabled()) {
@@ -182,6 +198,33 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
/**
+ * @return Keys for which {@link MiniFuture} isn't completed.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ public Set<IgniteTxKey> requestedKeys() {
+ synchronized (futs) {
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i);
+
+ if (isMini(fut) && !fut.isDone()) {
+ MiniFuture miniFut = (MiniFuture)fut;
+
+ Collection<IgniteTxEntry> entries = miniFut.mapping().entries();
+
+ Set<IgniteTxKey> keys = U.newHashSet(entries.size());
+
+ for (IgniteTxEntry entry : entries)
+ keys.add(entry.txKey());
+
+ return keys;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
* Finds pending mini future by the given mini ID.
*
* @param miniId Mini ID to find.
@@ -264,7 +307,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (!txStateCheck) {
if (tx.setRollbackOnly()) {
- if (tx.timedOut())
+ if (tx.remainingTime() == -1)
onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " +
"was rolled back: " + this), false);
else
@@ -437,89 +480,97 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
final ClusterNode n = m.node();
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- null,
- m.writes(),
- m.near(),
- txMapping.transactionNodes(),
- m.last(),
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash(),
- m.clientFirst(),
- tx.activeCachesDeploymentEnabled());
-
- for (IgniteTxEntry txEntry : m.entries()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
- }
+ long timeout = tx.remainingTime();
+
+ if (timeout != -1) {
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ null,
+ m.writes(),
+ m.near(),
+ txMapping.transactionNodes(),
+ m.last(),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ m.clientFirst(),
+ tx.activeCachesDeploymentEnabled());
+
+ for (IgniteTxEntry txEntry : m.entries()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
- // Must lock near entries separately.
- if (m.near()) {
- try {
- tx.optimisticLockEntries(req.writes());
+ // Must lock near entries separately.
+ if (m.near()) {
+ try {
+ tx.optimisticLockEntries(req.writes());
- tx.userPrepare();
- }
- catch (IgniteCheckedException e) {
- onError(e, false);
+ tx.userPrepare();
+ }
+ catch (IgniteCheckedException e) {
+ onError(e, false);
+ }
}
- }
- final MiniFuture fut = new MiniFuture(this, m, mappings);
+ final MiniFuture fut = new MiniFuture(this, m, mappings);
- req.miniId(fut.futureId());
+ req.miniId(fut.futureId());
- add(fut); // Append new future.
+ add(fut); // Append new future.
- // If this is the primary node for the keys.
- if (n.isLocal()) {
- // At this point, if any new node joined, then it is
- // waiting for this transaction to complete, so
- // partition reassignments are not possible here.
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+ // If this is the primary node for the keys.
+ if (n.isLocal()) {
+ // At this point, if any new node joined, then it is
+ // waiting for this transaction to complete, so
+ // partition reassignments are not possible here.
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut =
+ cctx.tm().txHandler().prepareTx(n.id(), tx, req);
- prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
- try {
- fut.onResult(n.id(), prepFut.get());
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+ }
}
- catch (IgniteCheckedException e) {
- fut.onResult(e);
+ });
+ }
+ else {
+ try {
+ cctx.io().send(n, req, tx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() +
+ ", node=" + n.id() + ']');
}
}
- });
- }
- else {
- try {
- cctx.io().send(n, req, tx.ioPolicy());
+ catch (ClusterTopologyCheckedException e) {
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() +
- ", node=" + n.id() + ']');
+ fut.onNodeLeft(e, false);
}
- }
- catch (ClusterTopologyCheckedException e) {
- e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+ catch (IgniteCheckedException e) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() +
+ ", node=" + n.id() +
+ ", err=" + e + ']');
+ }
- fut.onNodeLeft(e, false);
- }
- catch (IgniteCheckedException e) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() +
- ", node=" + n.id() +
- ", err=" + e + ']');
+ fut.onResult(e);
}
-
- fut.onResult(e);
}
}
+ else
+ onTimeout();
}
finally {
if (set)
@@ -623,6 +674,61 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
return cur;
}
+ /**
+ *
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private void onTimeout() {
+ if (cctx.tm().deadlockDetectionEnabled()) {
+ Set<IgniteTxKey> keys = null;
+
+ if (keyLockFut != null)
+ keys = new HashSet<>(keyLockFut.lockKeys);
+ else {
+ if (futs != null && !futs.isEmpty()) {
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i);
+
+ if (isMini(fut) && !fut.isDone()) {
+ MiniFuture miniFut = (MiniFuture)fut;
+
+ Collection<IgniteTxEntry> entries = miniFut.mapping().entries();
+
+ keys = U.newHashSet(entries.size());
+
+ for (IgniteTxEntry entry : entries)
+ keys.add(entry.txKey());
+
+ break;
+ }
+ }
+ }
+ }
+
+ add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception, GridNearTxPrepareResponse>() {
+ @Override public GridNearTxPrepareResponse apply(TxDeadlock deadlock, Exception e) {
+ if (e != null)
+ U.warn(log, "Failed to detect deadlock.", e);
+ else {
+ e = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " +
+ "transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']',
+ deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx)) : null);
+ }
+
+ onDone(null, e);
+
+ return null;
+ }
+ }, cctx.tm().detectDeadlock(tx, keys)));
+ }
+ else {
+ ERR_UPD.compareAndSet(this, null, new IgniteTxTimeoutCheckedException("Failed to acquire lock " +
+ "within provided timeout for transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']'));
+
+ onComplete(false);
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@@ -652,7 +758,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
private static final long serialVersionUID = 0L;
/** Receive result flag updater. */
- private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
+ private static final AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
/** Parent future. */
@@ -745,15 +851,21 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
/**
- * @param nodeId Failed node ID.
* @param res Result callback.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- void onResult(UUID nodeId, final GridNearTxPrepareResponse res) {
+ void onResult(final GridNearTxPrepareResponse res) {
if (isDone())
return;
if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
+ if (parent.cctx.tm().deadlockDetectionEnabled() &&
+ (parent.tx.remainingTime() == -1 || res.error() instanceof IgniteTxTimeoutCheckedException)) {
+ parent.onTimeout();
+
+ return;
+ }
+
if (res.error() != null) {
// Fail the whole compound future.
parent.onError(res.error(), false);
@@ -801,8 +913,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*/
private void remap() {
parent.prepareOnTopology(true, new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
onDone((GridNearTxPrepareResponse) null);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 4d77a3c..a00cf3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -40,8 +40,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
* @param cctx Context.
* @param tx Transaction.
*/
- public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx,
- GridNearTxLocal tx) {
+ public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) {
super(cctx, tx);
assert tx.optimistic() : tx;
@@ -172,7 +171,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
protected static class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
/** */
@GridToStringInclude
- private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ protected Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
/** */
private volatile boolean allKeysAdded;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index ef2edc9..34b8281 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -153,7 +153,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
@Override public void prepare() {
if (!tx.state(PREPARING)) {
if (tx.setRollbackOnly()) {
- if (tx.timedOut())
+ if (tx.remainingTime() == -1)
onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx));
else
onDone(new IgniteCheckedException("Invalid transaction state for prepare " +
@@ -222,6 +222,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
checkOnePhase();
+ long timeout = tx.remainingTime();
+
+ if (timeout == -1)
+ onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx));
+
for (final GridDistributedTxMapping m : mappings.values()) {
final ClusterNode node = m.node();
@@ -229,6 +234,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
futId,
tx.topologyVersion(),
tx,
+ timeout,
m.reads(),
m.writes(),
m.near(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index adde63c..bb5d482 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -633,6 +633,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (m.explicitLock())
syncMode = FULL_SYNC;
+ // Version to be added in completed versions on primary node.
+ GridCacheVersion completedVer = !commit && tx.timeout() > 0 ? tx.xidVersion() : null;
+
GridNearTxFinishRequest req = new GridNearTxFinishRequest(
futId,
tx.xidVersion(),
@@ -645,7 +648,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
m.explicitLock(),
tx.storeEnabled(),
tx.topologyVersion(),
- null,
+ completedVer, // Reuse 'baseVersion' to do not add new fields in message.
null,
null,
tx.size(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 28c60d4..410baf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -796,6 +796,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut;
if (fut == null) {
+ long timeout = remainingTime();
+
// Future must be created before any exception can be thrown.
if (optimistic()) {
fut = serializable() ?
@@ -807,6 +809,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (!PREP_FUT_UPD.compareAndSet(this, null, fut))
return prepFut;
+
+ if (timeout == -1) {
+ fut.onDone(this, timeoutException());
+
+ return fut;
+ }
}
else
// Prepare was called explicitly.
@@ -964,8 +972,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
Map<UUID, Collection<UUID>> txNodes,
boolean last
) {
+ long timeout = remainingTime();
+
if (state() != PREPARING) {
- if (timedOut())
+ if (timeout == -1)
return new GridFinishedFuture<>(
new IgniteTxTimeoutCheckedException("Transaction timed out: " + this));
@@ -975,11 +985,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + ", tx=" + this + ']'));
}
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
init();
GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture(
cctx,
this,
+ timeout,
IgniteUuid.randomUuid(),
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
last,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 9dfdb43..e55566b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -94,6 +94,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param futId Future ID.
* @param topVer Topology version.
* @param tx Transaction.
+ * @param timeout Transaction timeout.
* @param reads Read entries.
* @param writes Write entries.
* @param near {@code True} if mapping is for near caches.
@@ -112,6 +113,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
IgniteUuid futId,
AffinityTopologyVersion topVer,
IgniteInternalTx tx,
+ long timeout,
Collection<IgniteTxEntry> reads,
Collection<IgniteTxEntry> writes,
boolean near,
@@ -126,7 +128,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
boolean firstClientReq,
boolean addDepInfo
) {
- super(tx, reads, writes, txNodes, onePhaseCommit, addDepInfo);
+ super(tx, timeout, reads, writes, txNodes, onePhaseCommit, addDepInfo);
assert futId != null;
assert !firstClientReq || tx.optimistic() : tx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 8c0425d..dd900fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.lang.IgniteAsyncSupported;
@@ -46,7 +45,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Transaction managed by cache ({@code 'Ex'} stands for external).
*/
-public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
+public interface IgniteInternalTx extends AutoCloseable {
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index f76f4bf..eb2989e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -712,7 +712,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/**
* @return Transaction timeout exception.
*/
- protected final IgniteCheckedException timeoutException() {
+ public final IgniteCheckedException timeoutException() {
return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " +
"for transaction [timeout=" + timeout() + ", tx=" + this + ']');
}
@@ -1032,7 +1032,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
* @return {@code True} if state changed.
*/
@SuppressWarnings({"TooBroadScope"})
- private boolean state(TransactionState state, boolean timedOut) {
+ protected boolean state(TransactionState state, boolean timedOut) {
boolean valid = false;
TransactionState prev;
@@ -1154,24 +1154,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() {
- return xidVer.asGridUuid();
- }
-
- /** {@inheritDoc} */
- @Override public long endTime() {
- long endTime = timeout == 0 ? Long.MAX_VALUE : startTime + timeout;
-
- return endTime > 0 ? endTime : endTime < 0 ? Long.MAX_VALUE : endTime;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- if (local() && !dht())
- state(MARKED_ROLLBACK, true);
- }
-
- /** {@inheritDoc} */
@Override public boolean timedOut() {
return timedOut;
}
@@ -2387,21 +2369,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long endTime() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public boolean equals(Object o) {
return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 7c3c206..e67e60f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -112,8 +112,7 @@ public class IgniteTxHandler {
* @param req Request.
* @return Prepare future.
*/
- public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
- final GridNearTxPrepareRequest req) {
+ public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
", node=" + nearNodeId + ']');
@@ -361,7 +360,7 @@ public class IgniteTxHandler {
req.deployInfo() != null);
try {
- ctx.io().send(nearNode, res, req.policy());
+ ctx.io().send(nearNodeId, res, req.policy());
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() +
@@ -667,6 +666,10 @@ public class IgniteTxHandler {
assert nodeId != null;
assert req != null;
+ // 'baseVersion' message field is re-used for version to be added in completed versions.
+ if (!req.commit() && req.baseVersion() != null)
+ ctx.tm().addRolledbackTx(null, req.baseVersion());
+
// Transaction on local cache only.
if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped())
return new GridFinishedFuture<IgniteInternalTx>(locTx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d9aca4a..9ad7fb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -107,6 +107,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
+import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
@@ -547,14 +548,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@SuppressWarnings({"CatchGenericClass"})
public void userPrepare() throws IgniteCheckedException {
if (state() != PREPARING) {
- if (timedOut())
+ if (remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
TransactionState state = state();
setRollbackOnly();
- throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']');
+ throw new IgniteCheckedException("Invalid transaction state for prepare [state=" +
+ state + ", tx=" + this + ']');
}
checkValid();
@@ -629,7 +631,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
TransactionState state = state();
if (state != COMMITTING) {
- if (timedOut())
+ if (remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
setRollbackOnly();
@@ -3540,8 +3542,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
* @throws IgniteCheckedException If transaction check failed.
*/
protected void checkValid() throws IgniteCheckedException {
+ if (local() && !dht() && remainingTime() == -1)
+ state(MARKED_ROLLBACK, true);
+
if (isRollbackOnly()) {
- if (timedOut())
+ if (remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Cache transaction timed out: " + this);
TransactionState state = state();
@@ -3556,10 +3561,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
throw new IgniteCheckedException("Cache transaction marked as rollback-only: " + this);
}
-
- if (remainingTime() == -1 && setRollbackOnly())
- throw new IgniteTxTimeoutCheckedException("Cache transaction timed out " +
- "(was rolled back automatically): " + this);
}
/** {@inheritDoc} */
@@ -3604,7 +3605,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
TransactionState state = state();
- assert state == TransactionState.ACTIVE || timedOut() :
+ assert state == TransactionState.ACTIVE || remainingTime() == -1 :
"Invalid tx state for adding entry [op=" + op + ", val=" + val + ", entry=" + entry + ", filter=" +
Arrays.toString(filter) + ", txCtx=" + cctx.tm().txContextVersion() + ", tx=" + this + ']';