You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/16 16:15:59 UTC
ignite git commit: ignite-4371 Avoid synchronous 'rollback' call from
system threads
Repository: ignite
Updated Branches:
refs/heads/master b2cc951e3 -> 0c782b0b6
ignite-4371 Avoid synchronous 'rollback' call from system threads
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c782b0b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c782b0b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c782b0b
Branch: refs/heads/master
Commit: 0c782b0b6c210e837ed37ffb9b1eb5cdb7db5662
Parents: b2cc951
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 16 19:15:48 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 16 19:15:48 2016 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 37 +++---
.../processors/cache/GridCacheProcessor.java | 2 +-
.../processors/cache/GridCacheUtils.java | 10 +-
.../GridDistributedTxRemoteAdapter.java | 2 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 74 +++++------
.../cache/distributed/dht/GridDhtTxLocal.java | 125 +++++++------------
.../distributed/dht/GridDhtTxLocalAdapter.java | 7 +-
.../near/GridNearTxFinishFuture.java | 44 +++----
.../cache/distributed/near/GridNearTxLocal.java | 48 ++++---
.../cache/transactions/IgniteTxHandler.java | 13 +-
.../transactions/IgniteTxLocalAdapter.java | 3 +-
.../cache/transactions/IgniteTxManager.java | 7 +-
.../GridCacheMissingCommitVersionSelfTest.java | 6 +-
.../IgniteTxStoreExceptionAbstractSelfTest.java | 1 +
14 files changed, 161 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 8ea2169..ef8c994e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -852,24 +853,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
ready = cacheCtx.started();
if (ready) {
- GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
+ GridAffinityAssignmentCache affCache = cacheCtx.affinity().affinityCache();
- if (useOldApi) {
- locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
- locMap.nodeOrder(),
- locMap.updateSequence(),
- locMap);
- }
+ if (affCache != null) {
+ GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
+
+ if (useOldApi) {
+ locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
+ locMap.nodeOrder(),
+ locMap.updateSequence(),
+ locMap);
+ }
- addFullPartitionsMap(m,
- dupData,
- compress,
- cacheCtx.cacheId(),
- locMap,
- cacheCtx.affinity().affinityCache().similarAffinityKey());
+ addFullPartitionsMap(m,
+ dupData,
+ compress,
+ cacheCtx.cacheId(),
+ locMap,
+ affCache.similarAffinityKey());
- if (exchId != null)
- m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+ if (exchId != null)
+ m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+ }
+ else
+ assert cctx.cacheContext(cacheCtx.cacheId()) == null : cacheCtx.name();
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0be2072..9487589 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1761,7 +1761,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param req Stop request.
*/
- public void prepareCacheStop(DynamicCacheChangeRequest req) {
+ private void prepareCacheStop(DynamicCacheChangeRequest req) {
assert req.stop() || req.close() : req;
GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 47abf2f..969c41a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -911,9 +911,13 @@ public class GridCacheUtils {
if (tx == null)
return "null";
- return tx.getClass().getSimpleName() + "[id=" + tx.xid() + ", concurrency=" + tx.concurrency() +
- ", isolation=" + tx.isolation() + ", state=" + tx.state() + ", invalidate=" + tx.isInvalidate() +
- ", rollbackOnly=" + tx.isRollbackOnly() + ", nodeId=" + tx.nodeId() +
+ return tx.getClass().getSimpleName() + "[id=" + tx.xid() +
+ ", concurrency=" + tx.concurrency() +
+ ", isolation=" + tx.isolation() +
+ ", state=" + tx.state() +
+ ", invalidate=" + tx.isInvalidate() +
+ ", rollbackOnly=" + tx.isRollbackOnly() +
+ ", nodeId=" + tx.nodeId() +
", duration=" + (U.currentTimeMillis() - tx.startTime()) + ']';
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 4adfa8b..68c0e57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -386,7 +386,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
// If another thread is doing prepare or rollback.
if (!state(PREPARING)) {
// In optimistic mode prepare may be called multiple times.
- if(state() != PREPARING || !optimistic()) {
+ if (state() != PREPARING || !optimistic()) {
if (log.isDebugEnabled())
log.debug("Invalid transaction state for prepare: " + this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index ac2ab41..147cbea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -33,8 +33,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -94,9 +92,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/** Near mappings. */
private Map<UUID, GridDistributedTxMapping> nearMap;
- /** Trackable flag. */
- private boolean trackable = true;
-
/**
* @param cctx Context.
* @param tx Transaction.
@@ -151,46 +146,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/** {@inheritDoc} */
@Override public boolean trackable() {
- return trackable;
+ return true;
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
- trackable = false;
+ assert false;
}
/**
* @param e Error.
*/
- public void onError(Throwable e) {
- if (ERR_UPD.compareAndSet(this, null, e)) {
- boolean marked = tx.setRollbackOnly();
-
- if (e instanceof IgniteTxRollbackCheckedException) {
- if (marked) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
- }
- }
- }
- else if (tx.isSystemInvalidate()) { // Invalidate remote transactions on heuristic error.
- finish();
+ public void rollbackOnError(Throwable e) {
+ assert e != null;
- try {
- get();
- }
- catch (IgniteTxHeuristicCheckedException ignore) {
- // Future should complete with GridCacheTxHeuristicException.
- }
- catch (IgniteCheckedException err) {
- U.error(log, "Failed to invalidate transaction: " + tx, err);
- }
- }
+ if (ERR_UPD.compareAndSet(this, null, e)) {
+ tx.setRollbackOnly();
- onComplete();
+ finish(false);
}
}
@@ -240,12 +213,21 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/** {@inheritDoc} */
@Override public boolean onDone(IgniteInternalTx tx, Throwable err) {
if (initialized() || err != null) {
- if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING))
- this.tx.tmFinish(err == null);
-
Throwable e = this.err;
- if (e == null && commit)
+ if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) {
+ try {
+ this.tx.tmFinish(err == null);
+ }
+ catch (IgniteCheckedException finishErr) {
+ U.error(log, "Failed to finish tx: " + tx, e);
+
+ if (e == null)
+ e = finishErr;
+ }
+ }
+
+ if (commit && e == null)
e = this.tx.commitError();
Throwable finishErr = e != null ? e : err;
@@ -255,7 +237,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
finishErr = this.tx.commitError();
if (this.tx.syncMode() != PRIMARY_SYNC)
- this.tx.sendFinishReply(commit, finishErr);
+ this.tx.sendFinishReply(finishErr);
// Don't forget to clean up.
cctx.mvcc().removeFuture(futId);
@@ -284,13 +266,15 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/**
* Initializes future.
+ *
+ * @param commit Commit flag.
*/
@SuppressWarnings({"SimplifiableIfStatement", "IfMayBeConditional"})
- public void finish() {
+ public void finish(boolean commit) {
boolean sync;
if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap))
- sync = finish(dhtMap, nearMap);
+ sync = finish(commit, dhtMap, nearMap);
else if (!commit && !F.isEmpty(tx.lockTransactionNodes()))
sync = rollbackLockTransactions(tx.lockTransactionNodes());
else
@@ -308,7 +292,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
* @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for.
*/
private boolean rollbackLockTransactions(Collection<ClusterNode> nodes) {
- assert !commit;
assert !F.isEmpty(nodes);
if (tx.onePhaseCommit())
@@ -337,7 +320,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.commitVersion(),
tx.threadId(),
tx.isolation(),
- commit,
+ false,
tx.isInvalidate(),
tx.system(),
tx.ioPolicy(),
@@ -390,11 +373,14 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
}
/**
+ * @param commit Commit flag.
* @param dhtMap DHT map.
* @param nearMap Near map.
* @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for.
*/
- private boolean finish(Map<UUID, GridDistributedTxMapping> dhtMap, Map<UUID, GridDistributedTxMapping> nearMap) {
+ private boolean finish(boolean commit,
+ Map<UUID, GridDistributedTxMapping> dhtMap,
+ Map<UUID, GridDistributedTxMapping> nearMap) {
if (tx.onePhaseCommit())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index b659abb..4e39e9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -503,52 +503,57 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/**
+ * @param commit Commit flag.
* @param prepFut Prepare future.
* @param fut Finish future.
*/
- private void finishCommit(@Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) {
+ private void finishTx(boolean commit, @Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) {
+ assert prepFut == null || prepFut.isDone();
+
boolean primarySync = syncMode() == PRIMARY_SYNC;
IgniteCheckedException err = null;
- try {
- if (prepFut != null)
- prepFut.get(); // Check for errors.
-
- if (finish(true)) {
- if (primarySync)
- sendFinishReply(true, null);
-
- fut.finish();
+ if (!commit && prepFut != null) {
+ try {
+ prepFut.get();
}
- else {
- err = new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this));
-
- fut.onError(err);
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to prepare transaction [tx=" + this + ", e=" + e + ']');
+ }
+ finally {
+ prepFut = null;
}
}
- catch (IgniteTxOptimisticCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to optimistically prepare transaction [tx=" + this + ", e=" + e + ']');
- err = e;
+ try {
+ if (prepFut != null)
+ prepFut.get(); // Check for errors.
- fut.onError(e);
+ boolean finished = finish(commit);
+
+ if (!finished)
+ err = new IgniteCheckedException("Failed to finish transaction [commit=" + commit +
+ ", tx=" + CU.txString(this) + ']');
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to prepare transaction: " + this, e);
+ U.error(log, "Failed to finish transaction [commit=" + commit + ", tx=" + this + ']', e);
err = e;
-
- fut.onError(e);
}
- if (primarySync && err != null)
- sendFinishReply(true, err);
+ if (primarySync)
+ sendFinishReply(err);
+
+ if (err != null)
+ fut.rollbackOnError(err);
+ else
+ fut.finish(commit);
}
/** {@inheritDoc} */
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
+ @SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
if (log.isDebugEnabled())
log.debug("Committing dht local tx: " + this);
@@ -557,7 +562,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (pessimistic())
prepareAsync();
- final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true);
+ final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true);
cctx.mvcc().addFuture(fut, fut.futureId());
@@ -565,11 +570,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (prep != null) {
if (prep.isDone())
- finishCommit(prep, fut);
+ finishTx(true, prep, fut);
else {
prep.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
- finishCommit(f, fut);
+ finishTx(true, f, fut);
}
});
}
@@ -577,7 +582,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
else {
assert optimistic();
- finishCommit(null, fut);
+ finishTx(true, null, fut);
}
return fut;
@@ -590,70 +595,26 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
PREP_FUT_UPD.compareAndSet(this, fut, null);
}
- /**
- * @param prepFut Prepare future.
- * @param fut Finish future.
- */
- private void finishRollback(@Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) {
- try {
- if (prepFut != null)
- prepFut.get();
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to prepare or rollback transaction [tx=" + this + ", e=" + e + ']');
- }
-
- boolean primarySync = syncMode() == PRIMARY_SYNC;
-
- IgniteCheckedException err = null;
-
- try {
- if (finish(false) || state() == UNKNOWN) {
- if (primarySync)
- sendFinishReply(false, null);
-
- fut.finish();
- }
- else {
- err = new IgniteCheckedException("Failed to rollback transaction: " +
- CU.txString(GridDhtTxLocal.this));
-
- fut.onError(err);
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to gracefully rollback transaction: " + CU.txString(GridDhtTxLocal.this),
- e);
-
- err = e;
-
- fut.onError(e);
- }
-
- if (primarySync && err != null)
- sendFinishReply(false, err);
- }
-
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
- GridDhtTxPrepareFuture prepFut = this.prepFut;
-
- final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false);
+ final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, false);
cctx.mvcc().addFuture(fut, fut.futureId());
+ GridDhtTxPrepareFuture prepFut = this.prepFut;
+
if (prepFut != null) {
prepFut.complete();
prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
- finishRollback(f, fut);
+ finishTx(false, f, fut);
}
});
}
else
- finishRollback(null, fut);
+ finishTx(false, null, fut);
return fut;
}
@@ -672,7 +633,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/** {@inheritDoc} */
- @Override protected void sendFinishReply(boolean commit, @Nullable Throwable err) {
+ @Override protected void sendFinishReply(@Nullable Throwable err) {
if (nearFinFutId != null) {
if (nearNodeId.equals(cctx.localNodeId())) {
if (log.isDebugEnabled())
@@ -701,8 +662,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
catch (Throwable ex) {
- U.error(log, "Failed to send finish response to node (transaction was " +
- (commit ? "committed" : "rolledback") + ") [txId=" + nearXidVersion() +
+ U.error(log, "Failed to send finish response to node [txId=" + nearXidVersion() +
+ ", txState=" + state() +
", dhtTxId=" + xidVersion() +
", node=" + nearNodeId +
", res=" + res + ']', ex);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 35dfb62..1d88d84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -236,10 +236,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
AffinityTopologyVersion topVer);
/**
- * @param commit Commit flag.
* @param err Error, if any.
*/
- protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err);
+ protected abstract void sendFinishReply(@Nullable Throwable err);
/** {@inheritDoc} */
@Override public boolean needsCompletedVersions() {
@@ -249,7 +248,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/**
* @return Versions for all pending locks that were in queue before tx locks were released.
*/
- public Collection<GridCacheVersion> pendingVersions() {
+ Collection<GridCacheVersion> pendingVersions() {
return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
}
@@ -726,7 +725,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/*read*/read,
accessTtl,
filter == null ? CU.empty0() : filter,
- /**computeInvoke*/false);
+ /*computeInvoke*/false);
return ret;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 54bd543..9acab56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -298,34 +298,16 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (isDone())
return false;
- if (err != null) {
- tx.commitError(err);
-
- boolean marked = tx.setRollbackOnly();
-
- if (err instanceof IgniteTxRollbackCheckedException) {
- if (marked) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
- }
- }
- }
- else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error.
- try {
- tx.close();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to invalidate transaction: " + tx, ex);
- }
- }
+ if (err != null)
+ tx.setRollbackOnly();
+
+ if (commit) {
+ if (tx.commitError() != null)
+ err = tx.commitError();
+ else if (err != null)
+ tx.commitError(err);
}
- if (commit && tx.commitError() != null)
- err = tx.commitError();
-
if (initialized() || err != null) {
if (tx.needCheckBackup()) {
assert tx.onePhaseCommit();
@@ -349,7 +331,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
finishOnePhase(commit);
- tx.tmFinish(commit);
+ try {
+ tx.tmFinish(commit);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to finish tx: " + tx, e);
+
+ if (err == null)
+ err = e;
+ }
}
if (super.onDone(tx0, err)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ed37059..0730300 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -231,7 +231,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override protected void sendFinishReply(boolean commit, @Nullable Throwable err) {
+ @Override protected void sendFinishReply(@Nullable Throwable err) {
// We are in near transaction, do not send finish reply to local node.
}
@@ -1062,50 +1062,48 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return new GridFinishedFuture<IgniteInternalTx>(this);
}
- final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true);
+ final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true);
cctx.mvcc().addFuture(fut, fut.futureId());
if (prep == null || prep.isDone()) {
assert prep != null || optimistic();
+ IgniteCheckedException err = null;
+
try {
if (prep != null)
prep.get(); // Check for errors of a parent future.
-
- fut.finish();
- }
- catch (IgniteTxOptimisticCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']');
-
- fut.onError(e);
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to prepare transaction: " + this, e);
+ err = e;
- fut.onError(e);
+ U.error(log, "Failed to prepare transaction: " + this, e);
}
+
+ if (err != null)
+ fut.rollbackOnError(err);
+ else
+ fut.finish(true);
}
else
prep.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
+ IgniteCheckedException err = null;
+
try {
f.get(); // Check for errors of a parent future.
-
- fut.finish();
- }
- catch (IgniteTxOptimisticCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']');
-
- fut.onError(e);
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to prepare transaction: " + this, e);
+ err = e;
- fut.onError(e);
+ U.error(log, "Failed to prepare transaction: " + this, e);
}
+
+ if (err != null)
+ fut.rollbackOnError(err);
+ else
+ fut.finish(true);
}
});
@@ -1121,7 +1119,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (log.isDebugEnabled())
log.debug("Rolling back colocated tx locally: " + this);
- final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/false);
+ final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, false);
cctx.mvcc().addFuture(fut, fut.futureId());
@@ -1138,7 +1136,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
e.getMessage() + ']');
}
- fut.finish();
+ fut.finish(false);
}
else
prep.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -1151,7 +1149,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
e.getMessage() + ']');
}
- fut.finish();
+ fut.finish(false);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 2706d4d..eaf1c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -472,13 +472,8 @@ public class IgniteTxHandler {
req.last());
if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
- try {
- if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)
- tx.rollback();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to rollback transaction: " + tx, e);
- }
+ if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)
+ tx.rollbackAsync();
}
final GridDhtTxLocal tx0 = tx;
@@ -872,7 +867,7 @@ public class IgniteTxHandler {
U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
- IgniteInternalFuture<IgniteInternalTx> res = null;
+ IgniteInternalFuture<IgniteInternalTx> res;
IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
@@ -884,7 +879,7 @@ public class IgniteTxHandler {
if (e instanceof Error)
throw (Error)e;
- return res == null ? new GridFinishedFuture<IgniteInternalTx>(e) : res;
+ return res;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ba44655..0327247 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1000,8 +1000,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
* Commits transaction to transaction manager. Used for one-phase commit transactions only.
*
* @param commit If {@code true} commits transaction, otherwise rollbacks.
+ * @throws IgniteCheckedException If failed.
*/
- public void tmFinish(boolean commit) {
+ public void tmFinish(boolean commit) throws IgniteCheckedException {
assert onePhaseCommit();
if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index bd8e18b..24f8ea9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1192,8 +1192,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* Commits a transaction.
*
* @param tx Transaction to commit.
+ * @throws IgniteCheckedException If failed.
*/
- public void commitTx(IgniteInternalTx tx) {
+ public void commitTx(IgniteInternalTx tx) throws IgniteCheckedException {
assert tx != null;
assert tx.state() == COMMITTING : "Invalid transaction state for commit from tm [state=" + tx.state() +
", expected=COMMITTING, tx=" + tx + ']';
@@ -1211,12 +1212,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
// 1. Make sure that committed version has been recorded.
- if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
+ if (!(committed || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
uncommitTx(tx);
tx.errorWhenCommitting();
- throw new IgniteException("Missing commit version (consider increasing " +
+ throw new IgniteCheckedException("Missing commit version (consider increasing " +
IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
", tx=" + tx.getClass().getSimpleName() + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
index 19e49f3..ac56d18 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
@@ -43,7 +43,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes
private volatile boolean putFailed;
/** */
- private String maxCompletedTxCount;
+ private String maxCompletedTxCnt;
/**
*/
@@ -53,7 +53,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration() throws Exception {
- maxCompletedTxCount = System.getProperty(IGNITE_MAX_COMPLETED_TX_COUNT);
+ maxCompletedTxCnt = System.getProperty(IGNITE_MAX_COMPLETED_TX_COUNT);
System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, String.valueOf(5));
@@ -78,7 +78,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
- System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, maxCompletedTxCount != null ? maxCompletedTxCount : "");
+ System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, maxCompletedTxCnt != null ? maxCompletedTxCnt : "");
super.afterTest();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c782b0b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
index b65b441..795ab81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
@@ -357,6 +357,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb
/**
* @param key Key.
+ * @param putBefore If {@code true} expects non-null values.
* @throws Exception If failed.
*/
private void checkValue(final Integer key, boolean putBefore) throws Exception {