You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 08:00:36 UTC
[02/46] ignite git commit: IGNITE-264 - Check backup node for
one-phase transaction when primary node crashes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 23a8b1f..25ab297 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -63,7 +63,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/** Implicit transaction with one key flag. */
private boolean implicitSingleTx;
- /** One phase commit flag. */
+ /** Flag is kept for backward compatibility. */
private boolean onePhaseCommit;
/** Array of mapped DHT versions for this entry. */
@@ -220,20 +220,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
}
/**
- * @return One phase commit flag.
- */
- public boolean onePhaseCommit() {
- return onePhaseCommit;
- }
-
- /**
- * @param onePhaseCommit One phase commit flag.
- */
- public void onePhaseCommit(boolean onePhaseCommit) {
- this.onePhaseCommit = onePhaseCommit;
- }
-
- /**
* @return Sync commit flag.
*/
public boolean syncCommit() {
@@ -603,4 +589,4 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
return S.toString(GridNearLockRequest.class, this, "filter", Arrays.toString(filter),
"super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 529f29c..3f9decf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -50,6 +51,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -142,6 +144,16 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
* @param e Error.
*/
void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
+ if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
+ if (tx.onePhaseCommit()) {
+ tx.markForBackupCheck();
+
+ onComplete();
+
+ return;
+ }
+ }
+
if (err.compareAndSet(null, e)) {
boolean marked = tx.setRollbackOnly();
@@ -209,17 +221,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
this.err.compareAndSet(null, err);
- if (err == null)
- tx.state(PREPARED);
-
- if (super.onDone(tx, err)) {
- // Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
-
- return true;
- }
-
- return false;
+ return onComplete();
}
/**
@@ -233,10 +235,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* Completeness callback.
*/
- private void onComplete() {
- if (super.onDone(tx, err.get()))
+ private boolean onComplete() {
+ Throwable err0 = err.get();
+
+ if (err0 == null || tx.needCheckBackup())
+ tx.state(PREPARED);
+
+ if (super.onDone(tx, err0)) {
// Don't forget to clean up.
cctx.mvcc().removeFuture(this);
+
+ return true;
+ }
+
+ return false;
}
/** {@inheritDoc} */
@@ -244,6 +256,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
// Obtain the topology version to use.
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+ if (topVer == null && tx != null && tx.system()) {
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+ if (tx0 != null)
+ topVer = tx0.topologyVersionSnapshot();
+ }
+
if (topVer != null) {
tx.topologyVersion(topVer);
@@ -437,7 +457,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* @param reads Read entries.
* @param writes Write entries.
- * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node.
*/
private void prepare(
Iterable<IgniteTxEntry> reads,
@@ -618,7 +637,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
* @param topVer Topology version.
* @param cur Current mapping.
* @param waitLock Wait lock flag.
- * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
* @return Mapping.
*/
private GridDistributedTxMapping map(
@@ -626,7 +644,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
AffinityTopologyVersion topVer,
@Nullable GridDistributedTxMapping cur,
boolean waitLock
- ) throws IgniteCheckedException {
+ ) {
GridCacheContext cacheCtx = entry.context();
List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
@@ -857,4 +875,4 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 63f1f79..b8d2250 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -261,7 +261,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
err = this.err.get();
- if (err == null)
+ if (err == null || tx.needCheckBackup())
tx.state(PREPARED);
if (super.onDone(tx, err)) {
@@ -339,6 +339,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
* @param e Error.
*/
void onNodeLeft(ClusterTopologyCheckedException e) {
+ if (tx.onePhaseCommit()) {
+ tx.markForBackupCheck();
+
+ // Do not fail future for one-phase transaction right away.
+ onDone(tx);
+ }
+
onError(e);
}
@@ -366,4 +373,4 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index f29bd28..ddc8be5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -32,6 +34,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -42,14 +46,17 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
-import static org.apache.ignite.transactions.TransactionState.COMMITTING;
+import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
*
@@ -57,6 +64,9 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING;
public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx> {
/** */
+ public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
+
+ /** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
@@ -216,27 +226,66 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
+ /**
+ * @param nodeId Sender.
+ * @param res Result.
+ */
+ public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
+ if (!isDone())
+ for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
+
+ f.onResult(res);
+ }
+ }
+ }
+ }
+
/** {@inheritDoc} */
- @Override public boolean onDone(IgniteInternalTx tx, Throwable err) {
+ @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
if ((initialized() || err != null)) {
- if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING))
- this.tx.tmCommit();
+ if (tx.needCheckBackup()) {
+ assert tx.onePhaseCommit();
+
+ if (err != null)
+ err = new TransactionRollbackException("Failed to commit transaction.", err);
+
+ try {
+ tx.finish(err == null);
+ }
+ catch (IgniteCheckedException e) {
+ if (err != null)
+ err.addSuppressed(e);
+ else
+ err = e;
+ }
+ }
+
+ if (tx.onePhaseCommit()) {
+ finishOnePhase();
+
+ tx.tmFinish(err == null);
+ }
Throwable th = this.err.get();
- if (super.onDone(tx, th != null ? th : err)) {
+ if (super.onDone(tx0, th != null ? th : err)) {
if (error() instanceof IgniteTxHeuristicCheckedException) {
- AffinityTopologyVersion topVer = this.tx.topologyVersion();
+ AffinityTopologyVersion topVer = tx.topologyVersion();
- for (IgniteTxEntry e : this.tx.writeMap().values()) {
+ for (IgniteTxEntry e : tx.writeMap().values()) {
GridCacheContext cacheCtx = e.context();
try {
if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) {
- GridCacheEntryEx Entry = cacheCtx.cache().peekEx(e.key());
+ GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
- if (Entry != null)
- Entry.invalidate(null, this.tx.xidVersion());
+ if (entry != null)
+ entry.invalidate(null, tx.xidVersion());
}
}
catch (Throwable t) {
@@ -284,52 +333,194 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* Initializes future.
*/
void finish() {
- if (tx.onePhaseCommit()) {
- // No need to send messages as transaction was already committed on remote node.
- // Finish local mapping only as we need send commit message to backups.
- for (GridDistributedTxMapping m : mappings.values()) {
- if (m.node().isLocal()) {
- IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finishColocatedLocal(commit, tx);
-
- // Add new future.
- if (fut != null)
- add(fut);
- }
- }
+ if (tx.needCheckBackup()) {
+ assert tx.onePhaseCommit();
+
+ checkBackup();
+ // If checkBackup is set, it means that primary node has crashed and we will not need to send
+ // finish request to it, so we can mark future as initialized.
markInitialized();
return;
}
- if (mappings != null) {
- finish(mappings.values());
+ try {
+ if (tx.finish(commit) || (!commit && tx.state() == UNKNOWN)) {
+ if ((tx.onePhaseCommit() && needFinishOnePhase()) || (!tx.onePhaseCommit() && mappings != null))
+ finish(mappings.values());
- markInitialized();
+ markInitialized();
- if (!isSync()) {
- boolean complete = true;
+ if (!isSync()) {
+ boolean complete = true;
- for (IgniteInternalFuture<?> f : pending())
- // Mini-future in non-sync mode gets done when message gets sent.
- if (isMini(f) && !f.isDone())
- complete = false;
+ for (IgniteInternalFuture<?> f : pending())
+ // Mini-future in non-sync mode gets done when message gets sent.
+ if (isMini(f) && !f.isDone())
+ complete = false;
- if (complete)
- onComplete();
+ if (complete)
+ onComplete();
+ }
}
+ else
+ onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
}
- else {
- assert !commit;
+ catch (Error | RuntimeException e) {
+ onError(e);
- try {
- tx.rollback();
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ onError(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private void checkBackup() {
+ assert mappings.size() <= 1;
+
+ for (Map.Entry<UUID, GridDistributedTxMapping> entry : mappings.entrySet()) {
+ UUID nodeId = entry.getKey();
+ GridDistributedTxMapping mapping = entry.getValue();
+
+ Collection<UUID> backups = tx.transactionNodes().get(nodeId);
+
+ if (!F.isEmpty(backups)) {
+ assert backups.size() == 1;
+
+ UUID backupId = F.first(backups);
+
+ ClusterNode backup = cctx.discovery().node(backupId);
+
+ // Nothing to do if backup has left the grid.
+ if (backup == null)
+ return;
+
+ MiniFuture mini = new MiniFuture(backup, mapping);
+
+ add(mini);
+
+ if (backup.isLocal()) {
+ if (cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion())) {
+ readyNearMappingFromBackup(mapping);
+
+ mini.onDone(tx);
+ }
+ else
+ mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
+ "(transaction has been rolled back on backup node): " + tx.xidVersion()));
+ }
+ else {
+ GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
+ cctx.localNodeId(),
+ futureId(),
+ mini.futureId(),
+ tx.topologyVersion(),
+ tx.xidVersion(),
+ tx.commitVersion(),
+ tx.threadId(),
+ tx.isolation(),
+ true,
+ false,
+ tx.system(),
+ tx.ioPolicy(),
+ false,
+ true,
+ true,
+ null,
+ null,
+ null,
+ null,
+ 0,
+ null,
+ 0);
+
+ finishReq.checkCommitted(true);
+
+ try {
+ if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0)
+ cctx.io().send(backup, finishReq, tx.ioPolicy());
+ else
+ mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " +
+ "the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() +
+ ", ver=" + backup.version() + ']'));
+ }
+ catch (ClusterTopologyCheckedException e) {
+ mini.onResult(e);
+ }
+ catch (IgniteCheckedException e) {
+ mini.onResult(e);
+ }
+ }
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to rollback empty transaction: " + tx, e);
+ else
+ readyNearMappingFromBackup(mapping);
+ }
+ }
+
+ /**
+ *
+ */
+ private boolean needFinishOnePhase() {
+ if (F.isEmpty(tx.mappings()))
+ return false;
+
+ assert tx.mappings().size() == 1;
+
+ boolean finish = false;
+
+ for (Integer cacheId : tx.activeCacheIds()) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (cacheCtx.isNear()) {
+ finish = true;
+
+ break;
}
+ }
- markInitialized();
+ if (finish) {
+ GridDistributedTxMapping mapping = F.first(tx.mappings().values());
+
+ if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(mapping.node().version()) > 0)
+ finish = false;
+ }
+
+ return finish;
+ }
+
+ /**
+ *
+ */
+ private void finishOnePhase() {
+ // No need to send messages as transaction was already committed on remote node.
+ // Finish local mapping only as we need send commit message to backups.
+ for (GridDistributedTxMapping m : mappings.values()) {
+ if (m.node().isLocal()) {
+ IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finishColocatedLocal(commit, tx);
+
+ // Add new future.
+ if (fut != null)
+ add(fut);
+ }
+ }
+ }
+
+ /**
+ * @param mapping Mapping to finish.
+ */
+ private void readyNearMappingFromBackup(GridDistributedTxMapping mapping) {
+ if (mapping.near()) {
+ GridCacheVersion xidVer = tx.xidVersion();
+
+ mapping.dhtVersion(xidVer, xidVer);
+
+ tx.readyNearLocks(mapping, Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(),
+ Collections.<GridCacheVersion>emptyList());
}
}
@@ -417,9 +608,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
@SuppressWarnings("unchecked")
@Override public String apply(IgniteInternalFuture<?> f) {
if (isMini(f)) {
- return "[node=" + ((MiniFuture) f).node().id() +
- ", loc=" + ((MiniFuture) f).node().isLocal() +
- ", done=" + f.isDone() + "]";
+ MiniFuture m = (MiniFuture)f;
+
+ return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
}
else
return "[loc=true, done=" + f.isDone() + "]";
@@ -446,6 +637,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
@GridToStringInclude
private GridDistributedTxMapping m;
+ /** Backup check flag. */
+ private ClusterNode backup;
+
/**
* @param m Mapping.
*/
@@ -454,6 +648,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
+ * @param backup Backup to check.
+ * @param m Mapping associated with the backup.
+ */
+ MiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+ this.backup = backup;
+ this.m = m;
+ }
+
+ /**
* @return Future ID.
*/
IgniteUuid futureId() {
@@ -464,7 +667,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @return Node ID.
*/
public ClusterNode node() {
- return m.node();
+ assert m != null || backup != null;
+
+ return backup != null ? backup : m.node();
}
/**
@@ -492,20 +697,42 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
- // Complete future with tx.
- onDone(tx);
+ if (backup != null) {
+ readyNearMappingFromBackup(m);
+
+ onDone(e);
+ }
+ else
+ // Complete future with tx.
+ onDone(tx);
}
/**
* @param res Result callback.
*/
void onResult(GridNearTxFinishResponse res) {
+ assert backup == null;
+
if (res.error() != null)
onDone(res.error());
else
onDone(tx);
}
+ /**
+ * @param res Response.
+ */
+ void onResult(GridDhtTxFinishResponse res) {
+ assert backup != null;
+
+ readyNearMappingFromBackup(m);
+
+ if (res.checkCommittedError() != null)
+ onDone(res.checkCommittedError());
+ else
+ onDone(tx);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 5e5a222..c52a127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -97,8 +97,22 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
int txSize,
@Nullable UUID subjId,
int taskNameHash) {
- super(xidVer, futId, null, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer,
- committedVers, rolledbackVers, txSize);
+ super(
+ xidVer,
+ futId,
+ null,
+ threadId,
+ commit,
+ invalidate,
+ sys,
+ plc,
+ syncCommit,
+ syncRollback,
+ baseVer,
+ committedVers,
+ rolledbackVers,
+ txSize
+ );
this.explicitLock = explicitLock;
this.storeEnabled = storeEnabled;
@@ -290,4 +304,4 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
@Override public String toString() {
return GridToStringBuilder.toString(GridNearTxFinishRequest.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index fd64fdf..a4e06c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -114,6 +114,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
/** */
+ private boolean needCheckBackup;
+
+ /** */
private boolean hasRemoteLocks;
/**
@@ -164,6 +167,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
timeout,
false,
storeEnabled,
+ false,
txSize,
subjId,
taskNameHash);
@@ -242,6 +246,20 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/**
+ * Marks transaction to check if commit on backup.
+ */
+ public void markForBackupCheck() {
+ needCheckBackup = true;
+ }
+
+ /**
+ * @return If need to check tx commit on backup.
+ */
+ public boolean needCheckBackup() {
+ return needCheckBackup;
+ }
+
+ /**
* Checks if transaction is fully synchronous.
*
* @return {@code True} if transaction is fully synchronous.
@@ -501,9 +519,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
GridDistributedTxMapping m = mappings.get(n.id());
- if (m == null)
+ if (m == null) {
m = F.addIfAbsent(mappings, n.id(), new GridDistributedTxMapping(n));
+ m.near(map.near());
+
+ if (map.explicitLock())
+ m.markExplicitLock();
+ }
+
assert m != null;
for (IgniteTxEntry entry : map.entries())
@@ -736,7 +760,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<?> prepareFut = prepFut.get();
+ final IgniteInternalFuture<?> prepareFut = prepFut.get();
prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
@@ -744,17 +768,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
try {
// Make sure that here are no exceptions.
- f.get();
+ prepareFut.get();
- if (finish(true))
- fut0.finish();
- else
- fut0.onError(new IgniteCheckedException("Failed to commit transaction: " +
- CU.txString(GridNearTxLocal.this)));
+ fut0.finish();
}
catch (Error | RuntimeException e) {
commitErr.compareAndSet(null, e);
+ fut0.onError(e);
+
throw e;
}
catch (IgniteCheckedException e) {
@@ -796,15 +818,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']');
}
- try {
- if (finish(false) || state() == UNKNOWN)
- fut.finish();
- else
- fut.onError(new IgniteCheckedException("Failed to gracefully rollback transaction: " + CU.txString(this)));
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
+ fut.finish();
}
else {
prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -820,19 +834,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
GridNearTxFinishFuture fut0 = rollbackFut.get();
- try {
- if (finish(false) || state() == UNKNOWN)
- fut0.finish();
- else
- fut0.onError(new IgniteCheckedException("Failed to gracefully rollback transaction: " +
- CU.txString(GridNearTxLocal.this)));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to gracefully rollback transaction: " +
- CU.txString(GridNearTxLocal.this), e);
-
- fut0.onError(e);
- }
+ fut0.finish();
}
});
}
@@ -1253,4 +1255,4 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
@Override public String toString() {
return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 4395198..87c68b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -106,8 +106,22 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
@Nullable UUID subjId,
int taskNameHash
) throws IgniteCheckedException {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, subjId, taskNameHash);
+ super(
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
+ txSize,
+ subjId,
+ taskNameHash
+ );
assert nearNodeId != null;
@@ -162,8 +176,22 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, subjId, taskNameHash);
+ super(
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
+ txSize,
+ subjId,
+ taskNameHash
+ );
assert nearNodeId != null;
@@ -383,4 +411,4 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
@Override public String toString() {
return GridToStringBuilder.toString(GridNearTxRemote.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1ad1064..88752a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -211,6 +211,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
private AtomicBoolean preparing = new AtomicBoolean();
/** */
+ @GridToStringInclude
private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3);
/**
@@ -293,6 +294,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
long timeout,
boolean invalidate,
boolean storeEnabled,
+ boolean onePhaseCommit,
int txSize,
@Nullable UUID subjId,
int taskNameHash
@@ -312,6 +314,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
this.timeout = timeout;
this.invalidate = invalidate;
this.storeEnabled = storeEnabled;
+ this.onePhaseCommit = onePhaseCommit;
this.txSize = txSize;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -2210,4 +2213,4 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
return S.toString(TxShadow.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d64b26e..9efa43a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -59,24 +59,24 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
+import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
@@ -211,41 +211,40 @@ public class IgniteTxHandler {
final GridNearTxLocal locTx,
final GridNearTxPrepareRequest req
) {
- IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
-
- return new GridEmbeddedFuture<>(
- fut,
- new C2<Object, Exception, IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public IgniteInternalFuture<GridNearTxPrepareResponse> apply(Object o, Exception ex) {
- if (ex != null)
- throw new GridClosureException(ex);
-
- IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
- req.reads(),
- req.writes(),
- req.transactionNodes(),
- req.last(),
- req.lastBackups());
-
- if (locTx.isRollbackOnly())
- locTx.rollbackAsync();
-
- return fut;
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
+ req.reads(),
+ req.writes(),
+ req.transactionNodes(),
+ req.last(),
+ req.lastBackups());
+
+ if (locTx.isRollbackOnly())
+ locTx.rollbackAsync();
+
+ return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() {
+ @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) {
+ try {
+ return f.get();
}
- },
- new C2<GridNearTxPrepareResponse, Exception, GridNearTxPrepareResponse>() {
- @Nullable @Override public GridNearTxPrepareResponse apply(GridNearTxPrepareResponse res, Exception e) {
- if (e != null) {
- locTx.setRollbackOnly(); // Just in case.
-
- if (!(e instanceof IgniteTxOptimisticCheckedException))
- U.error(log, "Failed to prepare transaction: " + locTx, e);
- }
+ catch (Exception e) {
+ locTx.setRollbackOnly(); // Just in case.
+
+ if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
+ !X.hasCause(e, IgniteFutureCancelledException.class))
+ U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
- return res;
+ return new GridNearTxPrepareResponse(
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.version(),
+ null,
+ e,
+ null);
}
}
- );
+ });
}
/**
@@ -303,7 +302,7 @@ public class IgniteTxHandler {
GridDhtPartitionTopology top = null;
if (req.firstClientRequest()) {
- assert req.concurrency().equals(OPTIMISTIC) : req;
+ assert req.concurrency() == OPTIMISTIC : req;
assert CU.clientNode(nearNode) : nearNode;
top = firstEntry.context().topology();
@@ -333,7 +332,7 @@ public class IgniteTxHandler {
try {
ctx.io().send(nearNode, res, req.policy());
}
- catch (ClusterTopologyCheckedException e) {
+ catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
log.debug("Failed to send client tx remap response, client node failed " +
"[node=" + nearNode + ", req=" + req + ']');
@@ -363,6 +362,7 @@ public class IgniteTxHandler {
req.timeout(),
req.isInvalidate(),
true,
+ req.onePhaseCommit(),
req.txSize(),
req.transactionNodes(),
req.subjectId(),
@@ -389,6 +389,10 @@ public class IgniteTxHandler {
tx.transactionNodes(req.transactionNodes());
+ // Set near on originating node flag only if the sender node has new version.
+ if (req.near() && FINISH_NEAR_ONE_PHASE_SINCE.compareTo(nearNode.version()) <= 0)
+ tx.nearOnOriginatingNode(true);
+
if (req.onePhaseCommit()) {
assert req.last();
assert F.isEmpty(req.lastBackups()) || req.lastBackups().size() <= 1;
@@ -428,7 +432,8 @@ public class IgniteTxHandler {
catch (IgniteCheckedException e) {
tx0.setRollbackOnly(); // Just in case.
- if (!(e instanceof IgniteTxOptimisticCheckedException))
+ if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
+ !X.hasCause(e, IgniteFutureCancelledException.class))
U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
}
}
@@ -527,17 +532,32 @@ public class IgniteTxHandler {
assert nodeId != null;
assert res != null;
- GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.xid(),
- res.futureId());
+ if (res.checkCommitted()) {
+ GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
+ res.xid(), res.futureId());
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Received response for unknown future (will ignore): " + res);
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received response for unknown future (will ignore): " + res);
- return;
+ return;
+ }
+
+ fut.onResult(nodeId, res);
}
+ else {
+ GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
+ res.xid(), res.futureId());
- fut.onResult(nodeId, res);
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received response for unknown future (will ignore): " + res);
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
}
/**
@@ -653,42 +673,10 @@ public class IgniteTxHandler {
}
try {
- if (req.commit()) {
- if (tx == null) {
- // Create transaction and add entries.
- tx = ctx.tm().onCreated(null,
- new GridDhtTxLocal(
- ctx,
- nodeId,
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- true,
- false, /* we don't know, so assume false. */
- req.system(),
- req.explicitLock(),
- req.policy(),
- PESSIMISTIC,
- READ_COMMITTED,
- /*timeout */0,
- req.isInvalidate(),
- req.storeEnabled(),
- req.txSize(),
- null,
- req.subjectId(),
- req.taskNameHash()));
-
- if (tx == null || !ctx.tm().onStarted(tx))
- throw new IgniteTxRollbackCheckedException("Attempt to start a completed transaction: " + req);
-
- tx.topologyVersion(req.topologyVersion());
- }
- else {
- if (req.explicitLock())
- tx.explicitLock(req.explicitLock());
- }
+ assert tx != null : "Transaction is null for near finish request [nodeId=" +
+ nodeId + ", req=" + req + "]";
+ if (req.commit()) {
tx.storeEnabled(req.storeEnabled());
if (!tx.markFinalizing(USER_FINISH)) {
@@ -712,47 +700,17 @@ public class IgniteTxHandler {
return commitFut;
}
else {
- assert tx != null || req.explicitLock() : "Transaction is null for near rollback request [nodeId=" +
- nodeId + ", req=" + req + "]";
-
- if (tx != null) {
- tx.syncRollback(req.syncRollback());
-
- tx.nearFinishFutureId(req.futureId());
- tx.nearFinishMiniId(req.miniId());
+ tx.syncRollback(req.syncRollback());
- IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
-
- // Only for error logging.
- rollbackFut.listen(CU.errorLogger(log));
-
- return rollbackFut;
- }
- else {
- // Always send finish response.
- GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(),
- req.futureId(), req.miniId(), null);
+ tx.nearFinishFutureId(req.futureId());
+ tx.nearFinishMiniId(req.miniId());
- try {
- ctx.io().send(nodeId, res, req.policy());
- }
- catch (Throwable e) {
- // Double-check.
- if (ctx.discovery().node(nodeId) == null) {
- if (log.isDebugEnabled())
- log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res +
- ']');
- }
- else
- U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " +
- "res=" + res + ']', e);
+ IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
- if (e instanceof Error)
- throw (Error)e;
- }
+ // Only for error logging.
+ rollbackFut.listen(CU.errorLogger(log));
- return null;
- }
+ return rollbackFut;
}
}
catch (Throwable e) {
@@ -912,6 +870,12 @@ public class IgniteTxHandler {
if (log.isDebugEnabled())
log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
+ if (req.checkCommitted()) {
+ sendReply(nodeId, req, checkDhtRemoteTxCommitted(req.version()));
+
+ return;
+ }
+
GridDhtTxRemote dhtTx = ctx.tm().tx(req.version());
GridNearTxRemote nearTx = ctx.tm().nearTx(req.version());
@@ -947,13 +911,33 @@ public class IgniteTxHandler {
completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override
public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
- sendReply(nodeId, req);
+ sendReply(nodeId, req, true);
}
});
}
else
- sendReply(nodeId, req);
+ sendReply(nodeId, req, true);
}
+ else
+ sendReply(nodeId, req, true);
+ }
+
+ /**
+ * Checks whether DHT remote transaction with given version has been committed. If not, will add version
+ * to rollback version set so that late response will not falsely commit this transaction.
+ *
+ * @param writeVer Write version to check.
+ * @return {@code True} if transaction has been committed, {@code false} otherwise.
+ */
+ public boolean checkDhtRemoteTxCommitted(GridCacheVersion writeVer) {
+ assert writeVer != null;
+
+ boolean committed = true;
+
+ if (ctx.tm().addRolledbackTx(writeVer))
+ committed = false;
+
+ return committed;
}
/**
@@ -988,15 +972,14 @@ public class IgniteTxHandler {
try {
if (req.commit() || req.isSystemInvalidate()) {
- if (tx.commitVersion(req.commitVersion())) {
- tx.invalidate(req.isInvalidate());
- tx.systemInvalidate(req.isSystemInvalidate());
+ tx.commitVersion(req.commitVersion());
+ tx.invalidate(req.isInvalidate());
+ tx.systemInvalidate(req.isSystemInvalidate());
- // Complete remote candidates.
- tx.doneRemote(req.baseVersion(), null, null, null);
+ // Complete remote candidates.
+ tx.doneRemote(req.baseVersion(), null, null, null);
- tx.commit();
- }
+ tx.commit();
}
else {
tx.doneRemote(req.baseVersion(), null, null, null);
@@ -1067,23 +1050,33 @@ public class IgniteTxHandler {
* @param nodeId Node id that originated finish request.
* @param req Request.
*/
- protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req) {
- GridCacheMessage res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+ protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
+ if (req.replyRequired()) {
+ GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
- try {
- ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
- }
- catch (Throwable e) {
- // Double-check.
- if (ctx.discovery().node(nodeId) == null) {
- if (log.isDebugEnabled())
- log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']');
+ if (req.checkCommitted()) {
+ res.checkCommitted(true);
+
+ if (!committed)
+ res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
+ "(transaction has been rolled back on backup node): " + req.version()));
}
- else
- U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
- if (e instanceof Error)
- throw (Error)e;
+ try {
+ ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ }
+ catch (Throwable e) {
+ // Double-check.
+ if (ctx.discovery().node(nodeId) == null) {
+ if (log.isDebugEnabled())
+ log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']');
+ }
+ else
+ U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
}
}
@@ -1188,8 +1181,14 @@ public class IgniteTxHandler {
// in prepare phase will get properly ordered as well.
tx.prepare();
- if (req.last())
+ if (req.last()) {
+ assert !F.isEmpty(req.transactionNodes()) :
+ "Received last prepare request with empty transaction nodes: " + req;
+
+ tx.transactionNodes(req.transactionNodes());
+
tx.state(PREPARED);
+ }
res.invalidPartitionsByCacheId(tx.invalidPartitions());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7d32401..00b91dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -196,12 +196,29 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
long timeout,
boolean invalidate,
boolean storeEnabled,
+ boolean onePhaseCommit,
int txSize,
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, plc, concurrency, isolation, timeout,
- invalidate, storeEnabled, txSize, subjId, taskNameHash);
+ super(
+ cctx,
+ xidVer,
+ implicit,
+ implicitSingle,
+ /*local*/true,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ timeout,
+ invalidate,
+ storeEnabled,
+ onePhaseCommit,
+ txSize,
+ subjId,
+ taskNameHash
+ );
minVer = xidVer;
}
@@ -986,6 +1003,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
log.debug("Ignoring READ entry when committing: " + txEntry);
}
else {
+ assert ownsLock(txEntry.cached()):
+ "Transaction does not own lock for group lock entry during commit [tx=" +
+ this + ", txEntry=" + txEntry + ']';
+
if (conflictCtx == null || !conflictCtx.isUseOld()) {
if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
cached.updateTtl(null, txEntry.ttl());
@@ -1085,14 +1106,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Commits transaction to transaction manager. Used for one-phase commit transactions only.
*/
- public void tmCommit() {
+ public void tmFinish(boolean commit) {
assert onePhaseCommit();
if (doneFlag.compareAndSet(false, true)) {
// Unlock all locks.
- cctx.tm().commitTx(this);
+ if (commit)
+ cctx.tm().commitTx(this);
+ else
+ cctx.tm().rollbackTx(this);
- state(COMMITTED);
+ state(commit ? COMMITTED : ROLLED_BACK);
boolean needsCompletedVersions = needsCompletedVersions();
@@ -1386,7 +1410,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
skipStore);
-
// As optimization, mark as checked immediately
// for non-pessimistic if value is not null.
if (val != null && !pessimistic())
@@ -3785,4 +3808,4 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
*/
abstract T finish(T t) throws IgniteCheckedException;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index b28225d..4074eee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -415,7 +415,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
TransactionIsolation isolation,
long timeout,
boolean storeEnabled,
- int txSize) {
+ int txSize
+ ) {
assert sysCacheCtx == null || sysCacheCtx.systemTx();
UUID subjId = null; // TODO GG-9141 how to get subj ID?
@@ -692,7 +693,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/**
* @return Any transaction associated with the current thread.
*/
- public IgniteInternalTx anyActiveThreadTx() {
+ public IgniteInternalTx anyActiveThreadTx(IgniteInternalTx ignore) {
long threadId = Thread.currentThread().getId();
IgniteInternalTx tx = threadMap.get(threadId);
@@ -706,7 +707,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
- if (tx != null && tx.topologyVersionSnapshot() != null)
+ if (tx != null && tx != ignore && tx.topologyVersionSnapshot() != null)
return tx;
}
@@ -1067,7 +1068,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return If transaction was not already present in committed set.
*/
public boolean addCommittedTx(IgniteInternalTx tx) {
- return addCommittedTx(tx.xidVersion(), tx.nearXidVersion());
+ boolean res = addCommittedTx(tx.xidVersion(), tx.nearXidVersion());
+
+ if (!tx.local() && !tx.near() && tx.onePhaseCommit())
+ addCommittedTx(tx.nearXidVersion(), null);
+
+ return res;
}
/**
@@ -1261,9 +1267,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
uncommitTx(tx);
+ GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey();
+ GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey();
+
throw new IgniteException("Missing commit version (consider increasing " +
IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" +
- completedVers.firstKey() + ", lastVer=" + completedVers.lastKey() + ", tx=" + tx.xid() + ']');
+ first + ", lastVer=" + last + ", tx=" + tx.xid() + ']');
}
ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
@@ -1786,13 +1795,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param ver Version.
+ * @param xidVer Version.
* @return Future for flag indicating if transactions was committed.
*/
- public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion ver) {
+ public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion xidVer) {
final GridFutureAdapter<Boolean> resFut = new GridFutureAdapter<>();
- final IgniteInternalTx tx = cctx.tm().tx(ver);
+ final IgniteInternalTx tx = cctx.tm().tx(xidVer);
if (tx != null) {
assert tx.near() && tx.local() : tx;
@@ -1814,7 +1823,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return resFut;
}
- Boolean committed = completedVers.get(ver);
+ Boolean committed = null;
+
+ for (Map.Entry<GridCacheVersion, Boolean> entry : completedVers.entrySet()) {
+ if (entry.getValue() == null)
+ continue;
+
+ if (entry.getKey() instanceof CommittedVersion) {
+ CommittedVersion comm = (CommittedVersion)entry.getKey();
+
+ if (comm.nearVer.equals(xidVer)) {
+ committed = entry.getValue();
+
+ break;
+ }
+ }
+ }
if (log.isDebugEnabled())
log.debug("Near transaction committed: " + committed);
@@ -2030,9 +2054,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
try {
cctx.kernalContext().gateway().readLock();
}
- catch (IllegalStateException | IgniteClientDisconnectedException ignore) {
+ catch (IllegalStateException | IgniteClientDisconnectedException e) {
if (log.isDebugEnabled())
- log.debug("Failed to acquire kernal gateway [err=" + ignore + ']');
+ log.debug("Failed to acquire kernal gateway [err=" + e + ']');
return;
}
@@ -2213,4 +2237,4 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 0025f6bf..a5561e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -492,18 +492,28 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (!create)
return c.applyx();
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+ while (true) {
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
- if (err != null)
- throw err;
+ if (err != null)
+ throw err;
- dataStructure = c.applyx();
+ dataStructure = c.applyx();
- tx.commit();
- }
+ tx.commit();
+
+ return dataStructure;
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteInternalFuture<?> fut = e.retryReadyFuture();
- return dataStructure;
+ fut.get();
+ }
+ catch (IgniteTxRollbackCheckedException ignore) {
+ // Safe to retry right away.
+ }
+ }
}
/**
@@ -559,31 +569,39 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (err != null)
throw err;
- T rmvInfo;
+ while (true) {
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ T2<Boolean, IgniteCheckedException> res =
+ utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- T2<Boolean, IgniteCheckedException> res =
- utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
+ err = res.get2();
- err = res.get2();
+ if (err != null)
+ throw err;
- if (err != null)
- throw err;
+ assert res.get1() != null;
- assert res.get1() != null;
+ boolean exists = res.get1();
- boolean exists = res.get1();
+ if (!exists)
+ return;
- if (!exists)
- return;
+ T rmvInfo = c.applyx();
- rmvInfo = c.applyx();
+ tx.commit();
- tx.commit();
- }
+ if (afterRmv != null && rmvInfo != null)
+ afterRmv.applyx(rmvInfo);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteInternalFuture<?> fut = e.retryReadyFuture();
- if (afterRmv != null && rmvInfo != null)
- afterRmv.applyx(rmvInfo);
+ fut.get();
+ }
+ catch (IgniteTxRollbackCheckedException ignore) {
+ // Safe to retry right away.
+ }
+ }
}
/**
@@ -953,27 +971,35 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return c.applyx(cacheCtx);
}
- T col;
+ while (true) {
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ T2<String, IgniteCheckedException> res =
+ utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- T2<String, IgniteCheckedException> res =
- utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
+ err = res.get2();
- err = res.get2();
+ if (err != null)
+ throw err;
- if (err != null)
- throw err;
+ String cacheName = res.get1();
- String cacheName = res.get1();
+ final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
- final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
+ T col = c.applyx(cacheCtx);
- col = c.applyx(cacheCtx);
+ tx.commit();
- tx.commit();
- }
+ return col;
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteInternalFuture<?> fut = e.retryReadyFuture();
- return col;
+ fut.get();
+ }
+ catch (IgniteTxRollbackCheckedException ignore) {
+ // Safe to retry right away.
+ }
+ }
}
/**
@@ -2060,4 +2086,4 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return S.toString(RemoveDataStructureProcessor.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 94cbfbd..c7750a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -22,19 +22,22 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteQueue;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
- * {@link org.apache.ignite.IgniteQueue} implementation using transactional cache.
+ * {@link IgniteQueue} implementation using transactional cache.
*/
public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
/**
@@ -76,7 +79,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
break;
}
}
- catch (ClusterTopologyCheckedException e) {
+ catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
if (e instanceof ClusterGroupEmptyCheckedException)
throw e;
@@ -123,14 +126,14 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
break;
}
- catch (ClusterTopologyCheckedException e) {
+ catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
if (e instanceof ClusterGroupEmptyCheckedException)
throw e;
if (cnt++ == MAX_UPDATE_RETRIES)
throw e;
else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
+ U.warn(log, "Failed to poll item, will retry [err=" + e + ']');
U.sleep(RETRY_DELAY);
}
@@ -180,7 +183,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
break;
}
- catch (ClusterTopologyCheckedException e) {
+ catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
if (e instanceof ClusterGroupEmptyCheckedException)
throw e;
@@ -223,7 +226,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
break;
}
- catch (ClusterTopologyCheckedException e) {
+ catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
if (e instanceof ClusterGroupEmptyCheckedException)
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
index 5f083b6..9cf1da4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
@@ -274,7 +274,6 @@ public class IgfsFileAffinityRange implements Message, Externalizable {
writer.incrementState();
case 1:
- // The field 'done' was removed, but its writing preserved for compatibility reasons.
if (!writer.writeBoolean("done", done))
return false;
@@ -320,7 +319,6 @@ public class IgfsFileAffinityRange implements Message, Externalizable {
reader.incrementState();
case 1:
- // field 'done' was removed, but reading preserved for compatibility reasons.
done = reader.readBoolean("done");
if (!reader.isLastRead())
@@ -371,4 +369,4 @@ public class IgfsFileAffinityRange implements Message, Externalizable {
@Override public String toString() {
return S.toString(IgfsFileAffinityRange.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/portable/GridPortableMetaDataSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/portable/GridPortableMetaDataSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/portable/GridPortableMetaDataSelfTest.java
index 9054297..fa3c9a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/portable/GridPortableMetaDataSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/portable/GridPortableMetaDataSelfTest.java
@@ -148,6 +148,8 @@ public class GridPortableMetaDataSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testNoConfiguration() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1377");
+
portables().toPortable(new TestObject3());
assertNotNull(portables().metadata(TestObject3.class));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
index 0f16862..9c4b7b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
@@ -26,6 +26,7 @@ import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.store.CacheStore;
@@ -104,6 +105,7 @@ public abstract class CacheStoreUsageMultinodeAbstractTest extends GridCommonAbs
ccfg.setCacheMode(PARTITIONED);
ccfg.setAtomicityMode(atomicityMode());
+ ccfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY);
ccfg.setBackups(1);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -220,10 +222,16 @@ public abstract class CacheStoreUsageMultinodeAbstractTest extends GridCommonAbs
Transaction tx = tc != null ? ignite.transactions().txStart(tc, REPEATABLE_READ) : null;
- cache.put(key, key);
+ try {
+ cache.put(key, key);
- if (tx != null)
- tx.commit();
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override
@@ -310,4 +318,4 @@ public abstract class CacheStoreUsageMultinodeAbstractTest extends GridCommonAbs
// No-op.
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index faebcfe..3e646d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
@@ -2866,6 +2867,53 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
/**
+ * @param keys0 Keys to check.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected void checkUnlocked(final Collection<String> keys0) throws IgniteCheckedException {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ for (int i = 0; i < gridCount(); i++) {
+ GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite(i)).internalCache();
+
+ for (String key : keys0) {
+ GridCacheEntryEx entry = cache.peekEx(key);
+
+ if (entry != null) {
+ if (entry.lockedByAny()) {
+ info("Entry is still locked [i=" + i + ", entry=" + entry + ']');
+
+ return false;
+ }
+ }
+
+ if (cache.isNear()) {
+ entry = cache.context().near().dht().peekEx(key);
+
+ if (entry != null) {
+ if (entry.lockedByAny()) {
+ info("Entry is still locked [i=" + i + ", entry=" + entry + ']');
+
+ return false;
+ }
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ info("Entry was removed, will retry");
+
+ return false;
+ }
+ }
+ }, 10_000);
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testGlobalClearAll() throws Exception {
@@ -5173,4 +5221,4 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/** */
ONE_BY_ONE
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java
index f2bb8fa..cdf8eca 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java
@@ -1650,7 +1650,7 @@ public class GridCacheMvccSelfTest extends GridCommonAbstractTest {
* @param vers Ordered versions.
*/
private void checkOrder(Collection<GridCacheMvccCandidate> cands, GridCacheVersion... vers) {
- assert cands.size() == vers.length;
+ assertEquals(vers.length, cands.size());
int i = 0;
@@ -1841,4 +1841,4 @@ public class GridCacheMvccSelfTest extends GridCommonAbstractTest {
for (GridCacheMvccCandidate c : cands)
info(">>> " + c);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
index 26911a3..b401907 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
@@ -358,6 +358,32 @@ public class GridCachePutAllFailoverSelfTest extends GridCommonAbstractTest {
info(">>> Absent keys: " + absentKeys);
+ if (!F.isEmpty(absentKeys)) {
+ for (Ignite g : runningWorkers) {
+ IgniteKernal k = (IgniteKernal)g;
+
+ info(">>>> Entries on node: " + k.getLocalNodeId());
+
+ GridCacheAdapter<Object, Object> cache = k.internalCache("partitioned");
+
+ for (Integer key : absentKeys) {
+ GridCacheEntryEx entry = cache.peekEx(key);
+
+ if (entry != null)
+ info(" >>> " + entry);
+
+ if (cache.context().isNear()) {
+ GridCacheEntryEx entry0 = cache.context().near().dht().peekEx(key);
+
+ if (entry0 != null)
+ info(" >>> " + entry);
+ }
+ }
+
+ info("");
+ }
+ }
+
assertTrue(absentKeys.isEmpty());
// Actual primary cache size.
@@ -746,4 +772,4 @@ public class GridCachePutAllFailoverSelfTest extends GridCommonAbstractTest {
return failedOverJobs;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
index c5ad4ec..fc14085 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
@@ -161,6 +161,8 @@ public class IgniteCachePutAllRestartTest extends GridCommonAbstractTest {
final Ignite ignite = ignite(node);
+ info("Running iteration on the node [idx=" + node + ", nodeId=" + ignite.cluster().localNode().id() + ']');
+
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
Thread.currentThread().setName("put-thread");