You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/23 16:03:53 UTC
[5/6] ignite git commit: IGNITE-6827 Configurable rollback for long
running transactions before partition exchange IGNITE-7910 Improved
transaction debugging support
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index b4447b7..2869bb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -67,6 +68,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.NotNull;
@@ -259,6 +261,38 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
msgLog = cctx.shared().txLockMessageLogger();
log = U.logger(cctx.kernalContext(), logRef, GridDhtLockFuture.class);
}
+
+ if (tx != null) {
+ while(true) {
+ IgniteInternalFuture<Boolean> fut = tx.lockFut;
+
+ if (fut != null) {
+ if (fut == GridDhtTxLocalAdapter.ROLLBACK_FUT)
+ onError(tx.timedOut() ? tx.timeoutException() : tx.rollbackException());
+ else {
+ // Wait for collocated lock future.
+ assert fut instanceof GridDhtColocatedLockFuture : fut;
+
+ // Terminate this future if parent(collocated) future is terminated by rollback.
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ onError(e);
+ }
+ }
+ });
+ }
+
+ return;
+ }
+
+ if (tx.updateLockFuture(null, this))
+ return;
+ }
+ }
}
/** {@inheritDoc} */
@@ -650,19 +684,23 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
* @param entry Entry whose lock ownership changed.
*/
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
- if (isDone() || (inTx() && tx.remainingTime() == -1))
+ if (isDone() || (inTx() && (tx.remainingTime() == -1 || tx.isRollbackOnly())))
return false; // Check other futures.
if (log.isDebugEnabled())
log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]");
if (owner != null && owner.version().equals(lockVer)) {
+ boolean isEmpty;
+
synchronized (this) {
if (!pendingLocks.remove(entry.key()))
return false;
+
+ isEmpty = pendingLocks.isEmpty();
}
- if (checkLocks())
+ if (isEmpty)
map(entries());
return true;
@@ -722,7 +760,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
* @param unlock {@code True} if locks should be released.
* @return {@code True} if complete by this operation.
*/
- private boolean onComplete(boolean success, boolean stopping, boolean unlock) {
+ private synchronized boolean onComplete(boolean success, boolean stopping, boolean unlock) {
if (log.isDebugEnabled())
log.debug("Received onComplete(..) callback [success=" + success + ", fut=" + this + ']');
@@ -735,6 +773,9 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
cctx.tm().txContext(tx);
set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
+
+ if (success)
+ tx.clearLockFuture(this);
}
try {
@@ -774,7 +815,7 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
readyLocks();
- if (timeout > 0) {
+ if (timeout > 0 && !isDone()) { // Prevent memory leak if future is completed by call to readyLocks.
timeoutObj = new LockTimeoutObject();
cctx.time().addTimeoutObject(timeoutObj);
@@ -782,6 +823,21 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
}
/**
+ *
+ * @return {@code True} if future is done.
+ */
+ private boolean checkDone() {
+ if (isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Mapping won't proceed because future is done: " + this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param entries Entries.
*/
private void map(Iterable<GridDhtCacheEntry> entries) {
@@ -835,139 +891,146 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
}
}
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Mapping won't proceed because future is done: " + this);
-
+ if (checkDone())
return;
- }
if (log.isDebugEnabled())
log.debug("Mapped DHT lock future [dhtMap=" + F.nodeIds(dhtMap.keySet()) + ", dhtLockFut=" + this + ']');
long timeout = inTx() ? tx.remainingTime() : this.timeout;
- // Create mini futures.
- for (Map.Entry<ClusterNode, List<GridDhtCacheEntry>> mapped : dhtMap.entrySet()) {
- ClusterNode n = mapped.getKey();
+ synchronized (this) { // Prevents entry removal on concurrent rollback.
+ if (checkDone())
+ return;
+
+ // Create mini futures.
+ for (Map.Entry<ClusterNode, List<GridDhtCacheEntry>> mapped : dhtMap.entrySet()) {
+ ClusterNode n = mapped.getKey();
- List<GridDhtCacheEntry> dhtMapping = mapped.getValue();
+ List<GridDhtCacheEntry> dhtMapping = mapped.getValue();
- int cnt = F.size(dhtMapping);
+ int cnt = F.size(dhtMapping);
- if (cnt > 0) {
- assert !n.id().equals(cctx.localNodeId());
+ if (cnt > 0) {
+ assert !n.id().equals(cctx.localNodeId());
- if (inTx() && tx.remainingTime() == -1)
- return;
+ if (inTx() && tx.remainingTime() == -1)
+ return;
- MiniFuture fut = new MiniFuture(n, dhtMapping);
-
- GridDhtLockRequest req = new GridDhtLockRequest(
- cctx.cacheId(),
- nearNodeId,
- inTx() ? tx.nearXidVersion() : null,
- threadId,
- futId,
- fut.futureId(),
- lockVer,
- topVer,
- inTx(),
- read,
- isolation(),
- isInvalidate(),
- timeout,
- cnt,
- 0,
- inTx() ? tx.size() : cnt,
- inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0,
- read ? accessTtl : -1L,
- skipStore,
- cctx.store().configured(),
- keepBinary,
- cctx.deploymentEnabled());
-
- try {
- for (ListIterator<GridDhtCacheEntry> it = dhtMapping.listIterator(); it.hasNext();) {
- GridDhtCacheEntry e = it.next();
-
- boolean needVal = false;
+ MiniFuture fut = new MiniFuture(n, dhtMapping);
+
+ GridDhtLockRequest req = new GridDhtLockRequest(
+ cctx.cacheId(),
+ nearNodeId,
+ inTx() ? tx.nearXidVersion() : null,
+ threadId,
+ futId,
+ fut.futureId(),
+ lockVer,
+ topVer,
+ inTx(),
+ read,
+ isolation(),
+ isInvalidate(),
+ timeout,
+ cnt,
+ 0,
+ inTx() ? tx.size() : cnt,
+ inTx() ? tx.subjectId() : null,
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L,
+ skipStore,
+ cctx.store().configured(),
+ keepBinary,
+ cctx.deploymentEnabled());
- try {
- // Must unswap entry so that isNewLocked returns correct value.
- e.unswap(false);
+ try {
+ for (ListIterator<GridDhtCacheEntry> it = dhtMapping.listIterator(); it.hasNext(); ) {
+ GridDhtCacheEntry e = it.next();
- needVal = e.isNewLocked();
+ boolean needVal = false;
- if (needVal) {
- List<ClusterNode> owners = cctx.topology().owners(e.partition(),
- tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
+ try {
+ // Must unswap entry so that isNewLocked returns correct value.
+ e.unswap(false);
- // Do not preload if local node is partition owner.
- if (owners.contains(cctx.localNode()))
- needVal = false;
- }
- }
- catch (GridCacheEntryRemovedException ex) {
- assert false : "Entry cannot become obsolete when DHT local candidate is added " +
- "[e=" + e + ", ex=" + ex + ']';
- }
+ needVal = e.isNewLocked();
+
+ if (needVal) {
+ List<ClusterNode> owners = cctx.topology().owners(e.partition(),
+ tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
+
+ // Do not preload if local node is partition owner.
+ if (owners.contains(cctx.localNode()))
+ needVal = false;
+ }
- // Skip entry if it is not new and is not present in updated mapping.
- if (tx != null && !needVal)
- continue;
+ // Skip entry if it is not new and is not present in updated mapping.
+ if (tx != null && !needVal)
+ continue;
- boolean invalidateRdr = e.readerId(n.id()) != null;
+ boolean invalidateRdr = e.readerId(n.id()) != null;
- req.addDhtKey(e.key(), invalidateRdr, cctx);
+ req.addDhtKey(e.key(), invalidateRdr, cctx);
- if (needVal) {
- // Mark last added key as needed to be preloaded.
- req.markLastKeyForPreload();
+ if (needVal) {
+ // Mark last added key as needed to be preloaded.
+ req.markLastKeyForPreload();
- if (tx != null) {
- IgniteTxEntry txEntry = tx.entry(e.txKey());
+ if (tx != null) {
+ IgniteTxEntry txEntry = tx.entry(e.txKey());
- // NOOP entries will be sent to backups on prepare step.
- if (txEntry.op() == GridCacheOperation.READ)
- txEntry.op(GridCacheOperation.NOOP);
+ // NOOP entries will be sent to backups on prepare step.
+ if (txEntry.op() == GridCacheOperation.READ)
+ txEntry.op(GridCacheOperation.NOOP);
+ }
+ }
+
+ GridCacheMvccCandidate added = e.candidate(lockVer);
+
+ assert added != null;
+ assert added.dhtLocal();
+
+ if (added.ownerVersion() != null)
+ req.owned(e.key(), added.ownerVersion());
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert false : "Entry cannot become obsolete when DHT local candidate is added " +
+ "[e=" + e + ", ex=" + ex + ']';
}
}
- it.set(addOwned(req, e));
- }
-
- if (!F.isEmpty(req.keys())) {
- if (tx != null)
- tx.addLockTransactionNode(n);
+ if (!F.isEmpty(req.keys())) {
+ if (tx != null)
+ tx.addLockTransactionNode(n);
- add(fut); // Append new future.
+ add(fut); // Append new future.
- cctx.io().send(n, req, cctx.ioPolicy());
+ cctx.io().send(n, req, cctx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT lock fut, sent request [txId=" + nearLockVer +
- ", dhtTxId=" + lockVer +
- ", inTx=" + inTx() +
- ", nodeId=" + n.id() + ']');
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT lock fut, sent request [txId=" + nearLockVer +
+ ", dhtTxId=" + lockVer +
+ ", inTx=" + inTx() +
+ ", nodeId=" + n.id() + ']');
+ }
}
}
- }
- catch (IgniteCheckedException e) {
- // Fail the whole thing.
- if (e instanceof ClusterTopologyCheckedException)
- fut.onResult();
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT lock fut, failed to send request [txId=" + nearLockVer +
- ", dhtTxId=" + lockVer +
- ", inTx=" + inTx() +
- ", node=" + n.id() +
- ", err=" + e + ']');
- }
+ catch (IgniteCheckedException e) {
+ // Fail the whole thing.
+ if (e instanceof ClusterTopologyCheckedException)
+ fut.onResult();
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT lock fut, failed to send request [txId=" + nearLockVer +
+ ", dhtTxId=" + lockVer +
+ ", inTx=" + inTx() +
+ ", node=" + n.id() +
+ ", err=" + e + ']');
+ }
- fut.onResult(e);
+ fut.onResult(e);
+ }
}
}
}
@@ -978,35 +1041,6 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
}
}
- /**
- * @param req Request.
- * @param e Entry.
- * @return Entry.
- */
- private GridDhtCacheEntry addOwned(GridDhtLockRequest req, GridDhtCacheEntry e) {
- while (true) {
- try {
- GridCacheMvccCandidate added = e.candidate(lockVer);
-
- assert added != null;
- assert added.dhtLocal();
-
- if (added.ownerVersion() != null)
- req.owned(e.key(), added.ownerVersion());
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry when creating DHT lock request (will retry): " + e);
-
- e = cctx.dht().entryExx(e.key(), topVer);
- }
- }
-
- return e;
- }
-
/** {@inheritDoc} */
@Override public int hashCode() {
return futId.hashCode();
@@ -1260,38 +1294,43 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
if (cache0.isNear())
cache0 = ((GridNearCacheAdapter)cache0).dht();
- for (GridCacheEntryInfo info : res.preloadEntries()) {
- try {
- GridCacheEntryEx entry = cache0.entryEx(info.key(), topVer);
-
- cctx.shared().database().checkpointReadLock();
+ synchronized (GridDhtLockFuture.this) { // Prevents entry re-creation on concurrent rollback.
+ if (GridDhtLockFuture.this.checkDone())
+ return;
+ for (GridCacheEntryInfo info : res.preloadEntries()) {
try {
- if (entry.initialValue(info.value(),
- info.version(),
- info.ttl(),
- info.expireTime(),
- true, topVer,
- replicate ? DR_PRELOAD : DR_NONE,
- false)) {
- if (rec && !entry.isInternal())
- cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
- (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
- false, null, null, null, false);
+ GridCacheEntryEx entry = cache0.entryEx(info.key(), topVer);
+
+ cctx.shared().database().checkpointReadLock();
+
+ try {
+ if (entry.initialValue(info.value(),
+ info.version(),
+ info.ttl(),
+ info.expireTime(),
+ true, topVer,
+ replicate ? DR_PRELOAD : DR_NONE,
+ false)) {
+ if (rec && !entry.isInternal())
+ cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
+ (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
+ false, null, null, null, false);
+ }
+ }
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
}
}
- finally {
- cctx.shared().database().checkpointReadUnlock();
- }
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ catch (IgniteCheckedException e) {
+ onDone(e);
- return;
- }
- catch (GridCacheEntryRemovedException e) {
- assert false : "Entry cannot become obsolete when DHT local candidate is added " +
- "[e=" + e + ", ex=" + e + ']';
+ return;
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert false : "Entry cannot become obsolete when DHT local candidate is added " +
+ "[e=" + e + ", ex=" + e + ']';
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 27e5d6f..e3e29b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -74,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -671,11 +674,16 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
f = lockAllAsync(ctx, nearNode, req, null);
// Register listener just so we print out errors.
- // Exclude lock timeout exception since it's not a fatal exception.
+ // Exclude lock timeout and rollback exceptions since it's not a fatal exception.
f.listen(CU.errorLogger(log, GridCacheLockTimeoutException.class,
- GridDistributedLockCancelledException.class));
+ GridDistributedLockCancelledException.class, IgniteTxTimeoutCheckedException.class,
+ IgniteTxRollbackCheckedException.class));
}
+ /**
+ * @param node Node.
+ * @param req Request.
+ */
private boolean waitForExchangeFuture(final ClusterNode node, final GridNearLockRequest req) {
assert req.firstClientRequest() : req;
@@ -824,6 +832,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
skipStore,
keepBinary);
+ if (fut.isDone()) // Possible in case of cancellation or timeout or rollback.
+ return fut;
+
for (KeyCacheObject key : keys) {
try {
while (true) {
@@ -832,7 +843,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
try {
fut.addEntry(entry);
- // Possible in case of cancellation or time out.
+ // Possible in case of cancellation or time out or rollback.
if (fut.isDone())
return fut;
@@ -859,9 +870,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
}
- ctx.mvcc().addFuture(fut);
+ if (!fut.isDone()) {
+ ctx.mvcc().addFuture(fut);
- fut.map();
+ fut.map();
+ }
return fut;
}
@@ -962,10 +975,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
}
- boolean timedout = false;
+ boolean timedOut = false;
for (KeyCacheObject key : keys) {
- if (timedout)
+ if (timedOut)
break;
while (true) {
@@ -980,7 +993,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
fut.addEntry(key == null ? null : entry);
if (fut.isDone()) {
- timedout = true;
+ timedOut = true;
break;
}
@@ -1079,7 +1092,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (tx != null)
tx.rollbackDhtLocal();
- return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+ return new GridDhtFinishedFuture<>(new IgniteTxRollbackCheckedException(msg));
}
tx.topologyVersion(req.topologyVersion());
@@ -1117,7 +1130,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (e != null)
e = U.unwrap(e);
- assert !t.empty();
+ // Transaction can be emptied by asynchronous rollback.
+ assert e != null || !t.empty();
// Create response while holding locks.
final GridNearLockResponse resp = createLockReply(nearNode,
@@ -1296,13 +1310,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
null,
req.keepBinary());
- assert e.lockedBy(mappedVer) ||
- (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) :
+ assert e.lockedBy(mappedVer) || ctx.mvcc().isRemoved(e.context(), mappedVer) :
"Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() +
", entry=" + e +
", mappedVer=" + mappedVer + ", ver=" + ver +
- ", tx=" + tx + ", req=" + req +
- ", err=" + err + ']';
+ ", tx=" + tx + ", req=" + req + ']';
boolean filterPassed = false;
@@ -1384,12 +1396,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
Throwable err = res.error();
// Log error before sending reply.
- if (err != null && !(err instanceof GridCacheLockTimeoutException) && !ctx.kernalContext().isStopping())
+ if (err != null && !(err instanceof GridCacheLockTimeoutException) &&
+ !(err instanceof IgniteTxRollbackCheckedException) && !ctx.kernalContext().isStopping())
U.error(log, "Failed to acquire lock for request: " + req, err);
try {
- // Don't send reply message to this node or if lock was cancelled.
- if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class)) {
+ // Don't send reply message to this node or if lock was cancelled or tx was rolled back asynchronously.
+ if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class) &&
+ !X.hasCause(err, IgniteTxRollbackCheckedException.class)) {
ctx.io().send(nearNode, res, ctx.ioPolicy());
if (txLockMsgLog.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 0609f04..2e19df2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -395,13 +396,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
else
fut.prepare(req);
}
- catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
+ catch (IgniteTxTimeoutCheckedException | IgniteTxRollbackCheckedException | IgniteTxOptimisticCheckedException e) {
fut.onError(e);
}
catch (IgniteCheckedException e) {
setRollbackOnly();
- fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + this, e));
+ fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + CU.txString(this), e));
}
return chainOnePhasePrepare(fut);
@@ -419,6 +420,30 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
IgniteCheckedException err = null;
+ if (!commit) {
+ final IgniteInternalFuture<?> lockFut = tryRollbackAsync();
+
+ if (lockFut != null) {
+ if (lockFut instanceof GridDhtLockFuture)
+ ((GridDhtLockFuture)lockFut).onError(rollbackException());
+ else {
+ /**
+ * Prevents race with {@link GridDhtTransactionalCacheAdapter#lockAllAsync
+ * (GridCacheContext, ClusterNode, GridNearLockRequest, CacheEntryPredicate[])}
+ */
+ final IgniteInternalFuture finalPrepFut = prepFut;
+
+ lockFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> ignored) {
+ finishTx(false, finalPrepFut, fut);
+ }
+ });
+
+ return;
+ }
+ }
+ }
+
if (!commit && prepFut != null) {
try {
prepFut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index bed9243..604fe06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -46,7 +47,9 @@ import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -59,7 +62,6 @@ import org.jetbrains.annotations.Nullable;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -75,6 +77,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/** */
private static final long serialVersionUID = 0L;
+ /** Asynchronous rollback marker for lock futures. */
+ protected static final IgniteInternalFuture<Boolean> ROLLBACK_FUT = new GridFutureAdapter<>();
+
+ /** Lock future updater. */
+ private static final AtomicReferenceFieldUpdater<GridDhtTxLocalAdapter, IgniteInternalFuture> LOCK_FUT_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridDhtTxLocalAdapter.class, IgniteInternalFuture.class, "lockFut");
+
/** Near mappings. */
protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap<>();
@@ -96,6 +105,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/** Nodes where transactions were started on lock step. */
private Set<ClusterNode> lockTxNodes;
+ /** Enlist or lock future what is currently in progress. */
+ @SuppressWarnings("UnusedDeclaration")
+ @GridToStringExclude
+ protected volatile IgniteInternalFuture<Boolean> lockFut;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -561,69 +575,79 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
onePhaseCommit(onePhaseCommit);
try {
- Set<KeyCacheObject> skipped = null;
+ GridFutureAdapter<GridCacheReturn> enlistFut = new GridFutureAdapter<>();
- AffinityTopologyVersion topVer = topologyVersion();
+ if (!updateLockFuture(null, enlistFut))
+ return finishFuture(enlistFut, timedOut() ? timeoutException() : rollbackException(), false);
- GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+ Set<KeyCacheObject> skipped = null;
- // Enlist locks into transaction.
- for (int i = 0; i < entries.size(); i++) {
- GridCacheEntryEx entry = entries.get(i);
+ try {
+ AffinityTopologyVersion topVer = topologyVersion();
- KeyCacheObject key = entry.key();
+ GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
- IgniteTxEntry txEntry = entry(entry.txKey());
+ // Enlist locks into transaction.
+ for (int i = 0; i < entries.size(); i++) {
+ GridCacheEntryEx entry = entries.get(i);
- // First time access.
- if (txEntry == null) {
- GridDhtCacheEntry cached;
+ KeyCacheObject key = entry.key();
- while (true) {
- try {
- cached = dhtCache.entryExx(key, topVer);
+ IgniteTxEntry txEntry = entry(entry.txKey());
- cached.unswap(read);
+ // First time access.
+ if (txEntry == null) {
+ GridDhtCacheEntry cached;
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Get removed entry: " + key);
- }
- }
-
- addActiveCache(dhtCache.context(), false);
+ while (true) {
+ try {
+ cached = dhtCache.entryExx(key, topVer);
- txEntry = addEntry(NOOP,
- null,
- null,
- null,
- cached,
- null,
- CU.empty0(),
- false,
- -1L,
- -1L,
- null,
- skipStore,
- keepBinary,
- nearCache);
-
- if (read)
- txEntry.ttl(accessTtl);
+ cached.unswap(read);
- txEntry.cached(cached);
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Get removed entry: " + key);
+ }
+ }
- addReader(msgId, cached, txEntry, topVer);
- }
- else {
- if (skipped == null)
- skipped = new GridLeanSet<>();
+ addActiveCache(dhtCache.context(), false);
+
+ txEntry = addEntry(NOOP,
+ null,
+ null,
+ null,
+ cached,
+ null,
+ CU.empty0(),
+ false,
+ -1L,
+ -1L,
+ null,
+ skipStore,
+ keepBinary,
+ nearCache);
+
+ if (read)
+ txEntry.ttl(accessTtl);
+
+ txEntry.cached(cached);
+
+ addReader(msgId, cached, txEntry, topVer);
+ }
+ else {
+ if (skipped == null)
+ skipped = new GridLeanSet<>();
- skipped.add(key);
+ skipped.add(key);
+ }
}
}
+ finally {
+ finishFuture(enlistFut, null, true);
+ }
assert pessimistic();
@@ -690,6 +714,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
if (timeout == -1)
return new GridFinishedFuture<>(timeoutException());
+ if (isRollbackOnly())
+ return new GridFinishedFuture<>(rollbackException());
+
IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
timeout,
this,
@@ -826,6 +853,68 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
}
/**
+ * Atomically updates lock future.
+ *
+ * @param oldFut Old future.
+ * @param newFut New future.
+ *
+ * @return {@code true} If future was changed.
+ */
+ public boolean updateLockFuture(IgniteInternalFuture<?> oldFut, IgniteInternalFuture<?> newFut) {
+ return LOCK_FUT_UPD.compareAndSet(this, oldFut, newFut);
+ }
+
+ /**
+ * Clears lock future.
+ *
+ * @param cond Clear lock condition.
+ */
+ public void clearLockFuture(@Nullable IgniteInternalFuture cond) {
+ IgniteInternalFuture f = lockFut;
+
+ if (cond != null && f != cond)
+ return;
+
+ lockFut = null;
+ }
+
+ /**
+ *
+ * @param f Future to finish.
+ * @param err Error.
+ * @param clearLockFut {@code True} if need to clear lock future.
+ *
+ * @return Finished future.
+ */
+ public <T> GridFutureAdapter<T> finishFuture(GridFutureAdapter<T> f, Throwable err, boolean clearLockFut) {
+ if (clearLockFut)
+ clearLockFuture(null);
+
+ f.onDone(err);
+
+ return f;
+ }
+
+ /**
+ * Prepare async rollback.
+ *
+ * @return Current lock future or null if it's safe to roll back.
+ */
+ public @Nullable IgniteInternalFuture<?> tryRollbackAsync() {
+ IgniteInternalFuture<Boolean> fut;
+
+ while(true) {
+ fut = lockFut;
+
+ if (fut != null)
+ return fut == ROLLBACK_FUT ? null : fut;
+
+ if (updateLockFuture(null, ROLLBACK_FUT))
+ return null;
+ }
+ }
+
+ /**
* @param prepFut Prepare future.
* @return If transaction if finished on prepare step returns future which is completed after transaction finish.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index bde15c2..622774d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1040,7 +1040,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
readyLocks();
- if (timeoutObj != null) {
+ if (timeoutObj != null && !isDone()) {
// Start timeout tracking after 'readyLocks' to avoid race with timeout processing.
cctx.time().addTimeoutObject(timeoutObj);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index d334850..12819bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -103,6 +103,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
@GridDirectTransient
private List<IgniteTxKey> nearWritesCacheMissed;
+ /** {@code True} if remote tx should skip adding itself to completed versions map on finish. */
+ private boolean skipCompletedVers;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -169,6 +172,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size());
nearNodeId = tx.nearNodeId();
+
+ skipCompletedVers = tx.xidVersion() == tx.nearXidVersion();
}
/**
@@ -293,6 +298,13 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
/**
+ * @return {@code True} if remote tx should skip adding itself to completed versions map on finish.
+ */
+ public boolean skipCompletedVersion() {
+ return skipCompletedVers;
+ }
+
+ /**
* {@inheritDoc}
*
* @param ctx
@@ -443,18 +455,24 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
writer.incrementState();
case 29:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("skipCompletedVers", skipCompletedVers))
return false;
writer.incrementState();
case 30:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 31:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 32:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -549,7 +567,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 29:
- subjId = reader.readUuid("subjId");
+ skipCompletedVers = reader.readBoolean("skipCompletedVers");
if (!reader.isLastRead())
return false;
@@ -557,7 +575,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 30:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -565,6 +583,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 31:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 32:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -584,6 +610,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 32;
+ return 33;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 607f363..14d3866 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -194,7 +194,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
- IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx,
+ IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx,
readyTopVer,
Collections.singleton(ctx.toCacheKeyObject(key)),
deserializeBinary,
@@ -425,7 +425,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (expiryPlc == null)
expiryPlc = expiryPolicy(null);
- // Optimisation: try to resolve value locally and escape 'get future' creation.
+ // Optimization: try to resolve value locally and escape 'get future' creation.
if (!forcePrimary && ctx.affinityNode()) {
try {
Map<K, V> locVals = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index c10f2ca..da0858f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -237,6 +237,12 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
}
valMap = new ConcurrentHashMap<>();
+
+ if (tx != null && !tx.updateLockFuture(null, this)) {
+ onError(tx.timedOut() ? tx.timeoutException() : tx.rollbackException());
+
+ onComplete(false, false);
+ }
}
/** {@inheritDoc} */
@@ -405,7 +411,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
* @param success Success flag.
*/
public void complete(boolean success) {
- onComplete(success, true, true);
+ onComplete(success, true);
}
/**
@@ -450,8 +456,9 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
return;
}
+ // This warning can be triggered by deadlock detection code which clears pending futures.
U.warn(msgLog, "Collocated lock fut, failed to find mini future [txId=" + lockVer +
- ", inTx=" + inTx() +
+ ", tx=" + (inTx() ? CU.txString(tx) : "N/A") +
", node=" + nodeId +
", res=" + res +
", fut=" + this + ']');
@@ -536,12 +543,14 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
err = t;
}
- /** {@inheritDoc} */
+ /**
+ * Cancellation has special meaning for lock futures. It's called then lock must be released on rollback.
+ */
@Override public boolean cancel() {
- if (onCancelled())
- onComplete(false, true, true);
+ if (inTx())
+ onError(tx.rollbackException());
- return isCancelled();
+ return onComplete(false, true);
}
/** {@inheritDoc} */
@@ -562,7 +571,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
if (err != null)
success = false;
- return onComplete(success, true, true);
+ return onComplete(success, true);
}
/**
@@ -570,10 +579,9 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
*
* @param success {@code True} if lock was acquired.
* @param distribute {@code True} if need to distribute lock removal in case of failure.
- * @param restoreTimeout {@code True} if need restore tx timeout callback.
* @return {@code True} if complete by this operation.
*/
- private boolean onComplete(boolean success, boolean distribute, boolean restoreTimeout) {
+ private boolean onComplete(boolean success, boolean distribute) {
if (log.isDebugEnabled()) {
log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute +
", fut=" + this + ']');
@@ -588,12 +596,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
if (tx != null) {
cctx.tm().txContext(tx);
- if (restoreTimeout && tx.trackTimeout()) {
- // Need restore timeout before onDone is called and next tx operation can proceed.
- boolean add = tx.addTimeoutHandler();
-
- assert add;
- }
+ if (success)
+ tx.clearLockFuture(this);
}
if (super.onDone(success, err)) {
@@ -694,23 +698,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
* part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back.
*/
void map() {
- if (tx != null && tx.trackTimeout()) {
- if (!tx.removeTimeoutHandler()) {
- tx.finishFuture().listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
- IgniteTxTimeoutCheckedException err = new IgniteTxTimeoutCheckedException("Failed to " +
- "acquire lock, transaction was rolled back on timeout [timeout=" + tx.timeout() +
- ", tx=" + tx + ']');
-
- onError(err);
-
- onComplete(false, false, false);
- }
- });
-
- return;
- }
- }
+ if (isDone()) // Possible due to async rollback.
+ return;
if (timeout > 0) {
timeoutObj = new LockTimeoutObject();
@@ -973,7 +962,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
if (log.isDebugEnabled())
log.debug("Entry being locked did not pass filter (will not lock): " + entry);
- onComplete(false, false, true);
+ onComplete(false, false);
return;
}
@@ -1350,7 +1339,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
if (log.isDebugEnabled())
log.debug("Entry being locked did not pass filter (will not lock): " + entry);
- onComplete(false, false, true);
+ onComplete(false, false);
return false;
}
@@ -1430,44 +1419,48 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
if (log.isDebugEnabled())
log.debug("Timed out waiting for lock response: " + this);
- if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
- synchronized (GridDhtColocatedLockFuture.this) {
- requestedKeys = requestedKeys0();
+ if (inTx()) {
+ if (cctx.tm().deadlockDetectionEnabled()) {
+ synchronized (GridDhtColocatedLockFuture.this) {
+ requestedKeys = requestedKeys0();
- clear(); // Stop response processing.
- }
+ clear(); // Stop response processing.
+ }
- Set<IgniteTxKey> keys = new HashSet<>();
+ Set<IgniteTxKey> keys = new HashSet<>();
- for (IgniteTxEntry txEntry : tx.allEntries()) {
- if (!txEntry.locked())
- keys.add(txEntry.txKey());
- }
+ for (IgniteTxEntry txEntry : tx.allEntries()) {
+ if (!txEntry.locked())
+ keys.add(txEntry.txKey());
+ }
- IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
+ IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
- fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
- @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
- try {
- TxDeadlock deadlock = fut.get();
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
+ @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
+ try {
+ TxDeadlock deadlock = fut.get();
- if (deadlock != null)
- err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " +
- "transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']',
- new TransactionDeadlockException(deadlock.toString(cctx.shared())));
- }
- catch (IgniteCheckedException e) {
- err = e;
+ err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " +
+ "timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']',
+ deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) :
+ null);
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
- U.warn(log, "Failed to detect deadlock.", e);
- }
+ U.warn(log, "Failed to detect deadlock.", e);
+ }
- onComplete(false, true, true);
- }
- });
+ onComplete(false, true);
+ }
+ });
+ }
+ else
+ err = tx.timeoutException();
}
else
- onComplete(false, true, true);
+ onComplete(false, true);
}
/** {@inheritDoc} */
@@ -1716,4 +1709,4 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a21d98e..c8e0034 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -43,6 +43,7 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -1243,11 +1244,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
long nextDumpTime = 0;
- long futTimeout = 2 * cctx.gridConfig().getNetworkTimeout();
+ IgniteConfiguration cfg = cctx.gridConfig();
+
+ boolean rollbackEnabled = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange() > 0;
+
+ long waitTimeout = 2 * cfg.getNetworkTimeout();
while (true) {
try {
- partReleaseFut.get(futTimeout, TimeUnit.MILLISECONDS);
+ partReleaseFut.get(rollbackEnabled ?
+ cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange() :
+ waitTimeout, TimeUnit.MILLISECONDS);
break;
}
@@ -1256,7 +1263,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (nextDumpTime <= U.currentTimeMillis()) {
dumpPendingObjects(partReleaseFut);
- nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
+ nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, waitTimeout);
+ }
+
+ if (rollbackEnabled) {
+ rollbackEnabled = false;
+
+ cctx.tm().rollbackOnTopologyChange(initialVersion());
}
}
catch (IgniteCheckedException e) {
@@ -1286,7 +1299,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
while (true) {
try {
- locksFut.get(futTimeout, TimeUnit.MILLISECONDS);
+ locksFut.get(waitTimeout, TimeUnit.MILLISECONDS);
break;
}
@@ -1309,7 +1322,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
- nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
+ nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, waitTimeout);
if (getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
U.dumpThreads(log);
@@ -1326,7 +1339,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
while (true) {
try {
- releaseLatch.await(futTimeout, TimeUnit.MILLISECONDS);
+ releaseLatch.await(waitTimeout, TimeUnit.MILLISECONDS);
if (log.isInfoEnabled())
log.info("Finished waiting for partitions release latch: " + releaseLatch);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 6ff5e65..4b46bda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -239,6 +238,12 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
log = U.logger(cctx.kernalContext(), logRef, GridNearLockFuture.class);
valMap = new ConcurrentHashMap<>();
+
+ if (tx != null && !tx.updateLockFuture(null, this)) {
+ err = tx.timedOut() ? tx.timeoutException() : tx.rollbackException();
+
+ onComplete(false, false);
+ }
}
/** {@inheritDoc} */
@@ -436,7 +441,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
* @param success Success flag.
*/
public void complete(boolean success) {
- onComplete(success, true, true);
+ onComplete(success, true);
}
/**
@@ -657,7 +662,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]");
}
- onComplete(true, true, true);
+ onComplete(true, true);
return true;
}
@@ -665,12 +670,14 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
return false;
}
- /** {@inheritDoc} */
+ /**
+ * Cancellation has special meaning for lock futures. It's called then lock must be released on rollback.
+ */
@Override public boolean cancel() {
- if (onCancelled())
- onComplete(false, true, true);
+ if (inTx())
+ onError(tx.rollbackException());
- return isCancelled();
+ return onComplete(false, true);
}
/** {@inheritDoc} */
@@ -692,7 +699,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
if (err != null)
success = false;
- return onComplete(success, true, true);
+ return onComplete(success, true);
}
/**
@@ -700,10 +707,9 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
*
* @param success {@code True} if lock was acquired.
* @param distribute {@code True} if need to distribute lock removal in case of failure.
- * @param restoreTimeout {@code True} if need restore tx timeout callback.
* @return {@code True} if complete by this operation.
*/
- private boolean onComplete(boolean success, boolean distribute, boolean restoreTimeout) {
+ private boolean onComplete(boolean success, boolean distribute) {
if (log.isDebugEnabled()) {
log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute +
", fut=" + this + ']');
@@ -718,12 +724,8 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
if (tx != null) {
cctx.tm().txContext(tx);
- if (restoreTimeout && tx.trackTimeout()) {
- // Need restore timeout before onDone is called and next tx operation can proceed.
- boolean add = tx.addTimeoutHandler();
-
- assert add;
- }
+ if (success)
+ tx.clearLockFuture(this);
}
if (super.onDone(success, err)) {
@@ -785,23 +787,8 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
* part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back.
*/
void map() {
- if (tx != null && tx.trackTimeout()) {
- if (!tx.removeTimeoutHandler()) {
- tx.finishFuture().listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
- IgniteTxTimeoutCheckedException err = new IgniteTxTimeoutCheckedException("Failed to " +
- "acquire lock, transaction was rolled back on timeout [timeout=" + tx.timeout() +
- ", tx=" + tx + ']');
-
- onError(err);
-
- onComplete(false, false, false);
- }
- });
-
- return;
- }
- }
+ if (isDone()) // Possible due to async rollback.
+ return;
if (timeout > 0) {
timeoutObj = new LockTimeoutObject();
@@ -1020,7 +1007,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
if (log.isDebugEnabled())
log.debug("Entry being locked did not pass filter (will not lock): " + entry);
- onComplete(false, false, true);
+ onComplete(false, false);
return;
}
@@ -1476,44 +1463,48 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
timedOut = true;
- if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
- synchronized (GridNearLockFuture.this) {
- requestedKeys = requestedKeys0();
+ if (inTx()) {
+ if (cctx.tm().deadlockDetectionEnabled()) {
+ synchronized (GridNearLockFuture.this) {
+ requestedKeys = requestedKeys0();
- clear(); // Stop response processing.
- }
+ clear(); // Stop response processing.
+ }
- Set<IgniteTxKey> keys = new HashSet<>();
+ Set<IgniteTxKey> keys = new HashSet<>();
- for (IgniteTxEntry txEntry : tx.allEntries()) {
- if (!txEntry.locked())
- keys.add(txEntry.txKey());
- }
+ for (IgniteTxEntry txEntry : tx.allEntries()) {
+ if (!txEntry.locked())
+ keys.add(txEntry.txKey());
+ }
- IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
+ IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
- fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
- @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
- try {
- TxDeadlock deadlock = fut.get();
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
+ @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
+ try {
+ TxDeadlock deadlock = fut.get();
- if (deadlock != null)
- err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " +
- "transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']',
- new TransactionDeadlockException(deadlock.toString(cctx.shared())));
- }
- catch (IgniteCheckedException e) {
- err = e;
+ err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " +
+ "timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']',
+ deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) :
+ null);
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
- U.warn(log, "Failed to detect deadlock.", e);
- }
+ U.warn(log, "Failed to detect deadlock.", e);
+ }
- onComplete(false, true, true);
- }
- });
+ onComplete(false, true);
+ }
+ });
+ }
+ else
+ err = tx.timeoutException();
}
else
- onComplete(false, true, true);
+ onComplete(false, true);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 3e2c84a..974de6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -296,14 +296,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (!txStateCheck) {
if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
if (tx.timedOut())
- onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
- "was rolled back: " + this));
+ onDone(null, tx.timeoutException());
else
- onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'));
+ onDone(null, tx.rollbackException());
}
else
- onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+ onDone(null, new IgniteCheckedException("Invalid transaction state for " +
"prepare [state=" + tx.state() + ", tx=" + this + ']'));
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2afb096..247af84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
@@ -329,15 +330,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (!txStateCheck) {
if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
if (tx.remainingTime() == -1)
- onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " +
- "was rolled back: " + this), false);
+ onDone(tx.timeoutException());
else
- onError(new IgniteCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'), false);
+ onDone(tx.rollbackException());
}
else
- onError(new IgniteTxRollbackCheckedException("Invalid transaction state for " +
- "prepare [state=" + tx.state() + ", tx=" + this + ']'), false);
+ onDone(new IgniteCheckedException("Invalid transaction state for " +
+ "prepare [state=" + tx.state() + ", tx=" + this + ']'));
return;
}
@@ -768,7 +767,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
U.warn(log, "Failed to detect deadlock.", e);
else {
e = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " +
- "transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']',
+ "transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']',
deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx)) : null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 54ae85c..da9bdac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -157,13 +157,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (!tx.state(PREPARING)) {
if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
if (tx.remainingTime() == -1)
- onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx));
+ onDone(tx.timeoutException());
else
- onDone(new IgniteCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'));
+ onDone(tx.rollbackException());
}
else
- onDone(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " +
+ onDone(new IgniteCheckedException("Invalid transaction state for prepare " +
"[state=" + tx.state() + ", tx=" + this + ']'));
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java
index 95e4ded..53d901a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java
@@ -37,17 +37,13 @@ public class GridNearTxFastFinishFuture extends GridFutureAdapter<IgniteInternal
/** */
private final boolean commit;
- /** */
- private final boolean onTimeout;
-
/**
* @param tx Transaction.
* @param commit Commit flag.
*/
- GridNearTxFastFinishFuture(GridNearTxLocal tx, boolean commit, boolean onTimeout) {
+ GridNearTxFastFinishFuture(GridNearTxLocal tx, boolean commit) {
this.tx = tx;
this.commit = commit;
- this.onTimeout = onTimeout;
}
/** {@inheritDoc} */
@@ -56,9 +52,9 @@ public class GridNearTxFastFinishFuture extends GridFutureAdapter<IgniteInternal
}
/**
- *
+ * @param clearThreadMap {@code True} if need remove tx from thread map.
*/
- public void finish() {
+ public void finish(boolean clearThreadMap) {
try {
if (commit) {
tx.state(PREPARING);
@@ -74,7 +70,7 @@ public class GridNearTxFastFinishFuture extends GridFutureAdapter<IgniteInternal
tx.state(PREPARED);
tx.state(ROLLING_BACK);
- tx.context().tm().fastFinishTx(tx, false, !onTimeout);
+ tx.context().tm().fastFinishTx(tx, false, clearThreadMap);
tx.state(ROLLED_BACK);
}