You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/15 14:37:20 UTC
[2/3] ignite git commit: ignite-4768 txs
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 98f1140..13ca26a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -39,7 +39,6 @@ import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -91,7 +90,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.apache.ignite.transactions.TransactionState.ACTIVE;
-import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -192,12 +190,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/** */
protected boolean onePhaseCommit;
- /** */
- protected CacheWriteSynchronizationMode syncMode;
-
- /** If this transaction contains transform entries. */
- protected boolean transform;
-
/** Commit version. */
private volatile GridCacheVersion commitVer;
@@ -207,9 +199,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/** Done marker. */
protected volatile boolean isDone;
- /** Preparing flag (no need for volatile modifier). */
- private boolean preparing;
-
/** */
@GridToStringInclude
private Map<Integer, Set<Integer>> invalidParts;
@@ -416,8 +405,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public boolean storeUsed() {
- return storeEnabled() && txState().storeUsed(cctx);
+ @Override public boolean storeWriteThrough() {
+ return storeEnabled() && txState().storeWriteThrough(cctx);
}
/**
@@ -508,32 +497,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
}
- /** {@inheritDoc} */
- @Override public void onRemap(AffinityTopologyVersion topVer) {
- assert false : this;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasTransforms() {
- return transform;
- }
-
- /** {@inheritDoc} */
- @Override public boolean markPreparing() {
- synchronized (this) {
- if (preparing)
- return false;
-
- preparing = true;
-
- return true;
- }
- }
-
/**
* @return {@code True} if marked.
*/
- @Override public boolean markFinalizing(FinalizationStatus status) {
+ @Override public final boolean markFinalizing(FinalizationStatus status) {
boolean res;
switch (status) {
@@ -625,26 +592,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public boolean replicated() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode syncMode() {
- if (syncMode != null)
- return syncMode;
-
- return txState().syncMode(cctx);
- }
-
- /**
- * @param syncMode Write synchronization mode.
- */
- public void syncMode(CacheWriteSynchronizationMode syncMode) {
- this.syncMode = syncMode;
- }
-
- /** {@inheritDoc} */
@Override public IgniteUuid xid() {
return xidVer.asGridUuid();
}
@@ -897,30 +844,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
}
- /**
- *
- */
- @Override public void close() throws IgniteCheckedException {
- TransactionState state = state();
-
- if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED)
- rollback();
-
- synchronized (this) {
- try {
- while (!done())
- wait();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- if (!done())
- throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " +
- this, e);
- }
- }
- }
-
/** {@inheritDoc} */
@Override public boolean needsCompletedVersions() {
return false;
@@ -1176,12 +1099,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public boolean isSystemInvalidate() {
+ @Override public final boolean isSystemInvalidate() {
return sysInvalidate;
}
/** {@inheritDoc} */
- @Override public void systemInvalidate(boolean sysInvalidate) {
+ @Override public final void systemInvalidate(boolean sysInvalidate) {
this.sysInvalidate = sysInvalidate;
}
@@ -1950,21 +1873,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public void commit() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
- @Override public void rollback() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
@Override public boolean activeCachesDeploymentEnabled() {
return false;
}
@@ -1995,7 +1903,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public boolean storeUsed() {
+ @Override public boolean storeWriteThrough() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
@@ -2029,11 +1937,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public void onRemap(AffinityTopologyVersion topVer) {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
@Override public void commitError(Throwable e) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
@@ -2044,11 +1947,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public boolean markPreparing() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
@Override public boolean markFinalizing(FinalizationStatus status) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
@@ -2134,11 +2032,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public boolean replicated() {
- return false;
- }
-
- /** {@inheritDoc} */
@Override public UUID subjectId() {
return null;
}
@@ -2154,11 +2047,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode syncMode() {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public boolean hasWriteKey(IgniteTxKey key) {
return false;
}
@@ -2236,12 +2124,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public void prepare() throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+ @Override public IgniteInternalFuture<?> salvageTx() {
return null;
}
@@ -2371,11 +2254,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public boolean hasTransforms() {
- return false;
- }
-
- /** {@inheritDoc} */
@Override public boolean equals(Object o) {
return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 77387b0..4a1e085 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -255,7 +255,7 @@ public class IgniteTxHandler {
req.last());
if (locTx.isRollbackOnly())
- locTx.rollbackAsync();
+ locTx.rollbackNearTxLocalAsync();
return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() {
@Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) {
@@ -491,7 +491,7 @@ public class IgniteTxHandler {
if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)
- tx.rollbackAsync();
+ tx.rollbackDhtLocalAsync();
}
final GridDhtTxLocal tx0 = tx;
@@ -849,10 +849,11 @@ public class IgniteTxHandler {
assert req.syncMode() != null : req;
tx.syncMode(req.syncMode());
+ tx.nearFinishFutureId(req.futureId());
+ tx.nearFinishMiniId(req.miniId());
+ tx.storeEnabled(req.storeEnabled());
if (req.commit()) {
- tx.storeEnabled(req.storeEnabled());
-
if (!tx.markFinalizing(USER_FINISH)) {
if (log.isDebugEnabled())
log.debug("Will not finish transaction (it is handled by another thread): " + tx);
@@ -860,10 +861,7 @@ public class IgniteTxHandler {
return null;
}
- tx.nearFinishFutureId(req.futureId());
- tx.nearFinishMiniId(req.miniId());
-
- IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync();
+ IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitDhtLocalAsync();
// Only for error logging.
commitFut.listen(CU.errorLogger(log));
@@ -871,10 +869,7 @@ public class IgniteTxHandler {
return commitFut;
}
else {
- tx.nearFinishFutureId(req.futureId());
- tx.nearFinishMiniId(req.miniId());
-
- IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+ IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackDhtLocalAsync();
// Only for error logging.
rollbackFut.listen(CU.errorLogger(log));
@@ -891,7 +886,7 @@ public class IgniteTxHandler {
IgniteInternalFuture<IgniteInternalTx> res;
- IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+ IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackDhtLocalAsync();
// Only for error logging.
rollbackFut.listen(CU.errorLogger(log));
@@ -932,7 +927,7 @@ public class IgniteTxHandler {
throw e;
if (tx != null)
- return tx.rollbackAsync();
+ return tx.rollbackNearTxLocalAsync();
return new GridFinishedFuture<>(e);
}
@@ -1157,7 +1152,7 @@ public class IgniteTxHandler {
if (completeFut != null) {
completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
sendReply(nodeId, req, true, nearTxId);
}
});
@@ -1561,8 +1556,6 @@ public class IgniteTxHandler {
assert !F.isEmpty(req.transactionNodes()) :
"Received last prepare request with empty transaction nodes: " + req;
- tx.transactionNodes(req.transactionNodes());
-
tx.state(PREPARED);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index bffb295..9417e1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -154,13 +154,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
- @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+ @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
if (cacheCtx == null)
return false;
CacheStoreManager store = cacheCtx.store();
- return store.configured();
+ return store.configured() && store.isWriteThrough();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d457399..dc4e52f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -30,6 +30,7 @@ import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -139,6 +140,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@GridToStringInclude
protected IgniteTxLocalState txState;
+ /** */
+ protected CacheWriteSynchronizationMode syncMode;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -199,6 +203,23 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl();
}
+ /**
+ * @return Transaction write synchronization mode.
+ */
+ public final CacheWriteSynchronizationMode syncMode() {
+ if (syncMode != null)
+ return syncMode;
+
+ return txState().syncMode(cctx);
+ }
+
+ /**
+ * @param syncMode Write synchronization mode.
+ */
+ public void syncMode(CacheWriteSynchronizationMode syncMode) {
+ this.syncMode = syncMode;
+ }
+
/** {@inheritDoc} */
@Override public IgniteTxState txState() {
return txState;
@@ -410,21 +431,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
}
- /** {@inheritDoc} */
- @Override public void commit() throws IgniteCheckedException {
- try {
- commitAsync().get();
- }
- finally {
- cctx.tm().resetContext();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void prepare() throws IgniteCheckedException {
- prepareAsync().get();
- }
-
/**
* Checks that locks are in proper state for commit.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 0cf1d67..307c348 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -17,15 +17,8 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.util.Collection;
-import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.jetbrains.annotations.Nullable;
/**
@@ -59,5 +52,5 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
* @return {@code True} if state has been changed.
* @throws IgniteCheckedException If finish failed.
*/
- public boolean finish(boolean commit) throws IgniteCheckedException;
+ public boolean localFinish(boolean commit) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ff4a4e6..af406fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -82,7 +82,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -127,7 +126,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** Slow tx warn timeout (initialized to 0). */
private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0);
- /** Tx salvage timeout (default 3s). */
+ /** Tx salvage timeout. */
private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
/** One phase commit deferred ack request timeout. */
@@ -138,9 +137,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
- /** Version in which deadlock detection introduced. */
- public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
-
/** Deadlock detection maximum iterations. */
static int DEADLOCK_MAX_ITERS =
IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
@@ -184,7 +180,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
PER_SEGMENT_Q);
/** Pending one phase commit ack requests sender. */
- private GridDeferredAckMessageSender deferredAckMessageSender;
+ private GridDeferredAckMessageSender deferredAckMsgSnd;
/** Transaction finish synchronizer. */
private GridCacheTxFinishSync txFinishSync;
@@ -216,7 +212,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
txHnd = new IgniteTxHandler(cctx);
- deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+ deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
@Override public int getTimeout() {
return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
}
@@ -256,6 +252,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
UUID nodeId = discoEvt.eventNode().id();
+ // Wait some time in case there are some unprocessed messages from failed node.
cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
if (txFinishSync != null)
@@ -305,85 +302,35 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* Invalidates transaction.
*
* @param tx Transaction.
- * @return {@code True} if transaction was salvaged by this call.
*/
- public boolean salvageTx(IgniteInternalTx tx) {
- return salvageTx(tx, false, USER_FINISH);
+ public void salvageTx(IgniteInternalTx tx) {
+ salvageTx(tx, USER_FINISH);
}
/**
* Invalidates transaction.
*
* @param tx Transaction.
- * @param warn {@code True} if warning should be logged.
* @param status Finalization status.
- * @return {@code True} if transaction was salvaged by this call.
*/
- private boolean salvageTx(IgniteInternalTx tx, boolean warn, IgniteInternalTx.FinalizationStatus status) {
+ private void salvageTx(IgniteInternalTx tx, IgniteInternalTx.FinalizationStatus status) {
assert tx != null;
TransactionState state = tx.state();
- if (state == ACTIVE || state == PREPARING || state == PREPARED) {
- try {
- if (!tx.markFinalizing(status)) {
- if (log.isDebugEnabled())
- log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
-
- return false;
- }
-
- tx.systemInvalidate(true);
-
- tx.prepare();
-
- if (tx.state() == PREPARING) {
- if (log.isDebugEnabled())
- log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
- "by another thread: " + tx);
-
- return false;
- }
-
- if (tx instanceof IgniteTxRemoteEx) {
- IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
-
- rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList());
- }
-
- tx.commit();
-
- if (warn) {
- // This print out cannot print any peer-deployed entity either
- // directly or indirectly.
- U.warn(log, "Invalidated transaction because originating node either " +
- "crashed or left grid: " + CU.txString(tx));
- }
- }
- catch (IgniteCheckedException ignore) {
+ if (state == ACTIVE || state == PREPARING || state == PREPARED || state == MARKED_ROLLBACK) {
+ if (!tx.markFinalizing(status)) {
if (log.isDebugEnabled())
- log.debug("Optimistic failure while invalidating transaction (will rollback): " +
- tx.xidVersion());
+ log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e);
- }
- }
- }
- else if (state == MARKED_ROLLBACK) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e);
+ return;
}
- }
- return true;
+ tx.salvageTx();
+
+ if (log.isDebugEnabled())
+ log.debug("Invalidated transaction because originating node left grid: " + CU.txString(tx));
+ }
}
/**
@@ -427,7 +374,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return {@code True} if transaction has been committed or rolled back,
* {@code false} otherwise.
*/
- public boolean isCompleted(IgniteInternalTx tx) {
+ private boolean isCompleted(IgniteInternalTx tx) {
boolean completed = completedVersHashMap.containsKey(tx.xidVersion());
// Need check that for tx with timeout rollback message was not received before lock.
@@ -1237,7 +1184,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
unlockMultiple(tx, tx.readEntries());
// 6. Notify evictions.
- notifyEvitions(tx);
+ notifyEvictions(tx);
// 7. Remove obsolete entries from cache.
removeObsolete(tx);
@@ -1310,7 +1257,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
unlockMultiple(tx, tx.readEntries());
// 4. Notify evictions.
- notifyEvitions(tx);
+ notifyEvictions(tx);
// 5. Remove obsolete entries.
removeObsolete(tx);
@@ -1360,7 +1307,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (txIdMap.remove(tx.xidVersion(), tx)) {
// 1. Notify evictions.
- notifyEvitions(tx);
+ notifyEvictions(tx);
// 2. Evict near entries.
if (!tx.readMap().isEmpty()) {
@@ -1396,7 +1343,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
*
* @param tx Tx to uncommit.
*/
- public void uncommitTx(IgniteInternalTx tx) {
+ void uncommitTx(IgniteInternalTx tx) {
assert tx != null;
if (log.isDebugEnabled())
@@ -1413,15 +1360,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
unlockMultiple(tx, tx.readEntries());
// 3. Notify evictions.
- notifyEvitions(tx);
+ notifyEvictions(tx);
// 4. Remove from per-thread storage.
clearThreadMap(tx);
// 5. Unregister explicit locks.
- if (!tx.alternateVersions().isEmpty())
+ if (!tx.alternateVersions().isEmpty()) {
for (GridCacheVersion ver : tx.alternateVersions())
idMap.remove(ver);
+ }
// 6. Remove Near-2-DHT mappings.
if (tx instanceof GridCacheMappedVersion)
@@ -1477,7 +1425,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/**
* @param tx Transaction to notify evictions for.
*/
- private void notifyEvitions(IgniteInternalTx tx) {
+ private void notifyEvictions(IgniteInternalTx tx) {
if (tx.internal())
return;
@@ -2056,43 +2004,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return;
}
- if (supportsDeadlockDetection(node)) {
- TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
+ TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
- try {
- if (!cctx.localNodeId().equals(nodeId))
- req.prepareMarshal(cctx);
-
- cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
- }
- catch (IgniteCheckedException e) {
- if (e instanceof ClusterTopologyCheckedException) {
- if (log.isDebugEnabled())
- log.debug("Failed to finish deadlock detection, node left: " + nodeId);
- }
- else
- U.warn(log, "Failed to finish deadlock detection: " + e, e);
+ try {
+ if (!cctx.localNodeId().equals(nodeId))
+ req.prepareMarshal(cctx);
- fut.onDone();
- }
+ cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
}
- else {
- if (log.isDebugEnabled())
- log.debug("Failed to finish deadlock detection, node does not support deadlock detection: " + node);
+ catch (IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to finish deadlock detection, node left: " + nodeId);
+ }
+ else
+ U.warn(log, "Failed to finish deadlock detection: " + e, e);
fut.onDone();
}
}
/**
- * @param node Node.
- * @return {@code True} if node supports deadlock detection protocol.
- */
- private boolean supportsDeadlockDetection(ClusterNode node) {
- return TX_DEADLOCK_DETECTION_SINCE.compareToIgnoreTimestamp(node.version()) <= 0;
- }
-
- /**
* @param tx Tx.
* @param txKeys Tx keys.
* @return {@code True} if key is involved into tx.
@@ -2263,7 +2195,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param ver Version to ack.
*/
public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
- deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver);
+ deferredAckMsgSnd.sendDeferredAckMessage(nodeId, ver);
}
/**
@@ -2312,9 +2244,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
", failedNodeId=" + evtNodeId + ']');
for (final IgniteInternalTx tx : txs()) {
- if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) {
+ if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) {
// Invalidate transactions.
- salvageTx(tx, false, RECOVERY_FINISH);
+ salvageTx(tx, RECOVERY_FINISH);
}
else {
// Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 1c2ccbe..3c27bad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -88,7 +88,7 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
}
/** {@inheritDoc} */
- @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+ @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index c121b1b..822e44e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -104,7 +104,7 @@ public interface IgniteTxState {
* @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with
* store enabled.
*/
- public boolean storeUsed(GridCacheSharedContext cctx);
+ public boolean storeWriteThrough(GridCacheSharedContext cctx);
/**
* @param cctx Context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 76751de..399eea3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -289,14 +289,14 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
- @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+ @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
if (!activeCacheIds.isEmpty()) {
for (int i = 0; i < activeCacheIds.size(); i++) {
int cacheId = (int)activeCacheIds.get(i);
CacheStoreManager store = cctx.cacheContext(cacheId).store();
- if (store.configured())
+ if (store.configured() && store.isWriteThrough())
return true;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index c8c9219..0420182 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -395,7 +395,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, seq);
- tx.commitTopLevelTx();
+ tx.commit();
return seq;
}
@@ -496,7 +496,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, a);
- tx.commitTopLevelTx();
+ tx.commit();
return a;
}
@@ -560,7 +560,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
T dataStructure = c.applyx();
- tx.commitTopLevelTx();
+ tx.commit();
return dataStructure;
}
@@ -641,7 +641,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
T rmvInfo = c.applyx();
- tx.commitTopLevelTx();
+ tx.commit();
if (afterRmv != null && rmvInfo != null)
afterRmv.applyx(rmvInfo);
@@ -709,7 +709,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, ref);
- tx.commitTopLevelTx();
+ tx.commit();
return ref;
}
@@ -813,7 +813,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, stmp);
- tx.commitTopLevelTx();
+ tx.commit();
return stmp;
}
@@ -1048,7 +1048,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
T col = c.applyx(cacheCtx);
- tx.commitTopLevelTx();
+ tx.commit();
return col;
}
@@ -1162,7 +1162,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, latch);
- tx.commitTopLevelTx();
+ tx.commit();
return latch;
}
@@ -1211,7 +1211,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.remove(key);
- tx.commitTopLevelTx();
+ tx.commit();
}
else
tx.setRollbackOnly();
@@ -1283,7 +1283,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, sem0);
- tx.commitTopLevelTx();
+ tx.commit();
return sem0;
}
@@ -1329,7 +1329,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.remove(key);
- tx.commitTopLevelTx();
+ tx.commit();
}
else
tx.setRollbackOnly();
@@ -1401,7 +1401,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, reentrantLock0);
- tx.commitTopLevelTx();
+ tx.commit();
return reentrantLock0;
}
@@ -1448,7 +1448,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.remove(key);
- tx.commitTopLevelTx();
+ tx.commit();
}
else
tx.setRollbackOnly();
@@ -1481,7 +1481,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (val != null) {
dsView.remove(key);
- tx.commitTopLevelTx();
+ tx.commit();
}
else
tx.setRollbackOnly();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index 9ebea2c..640b72d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -102,7 +102,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -129,7 +129,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -156,7 +156,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -183,7 +183,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -442,7 +442,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -476,7 +476,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -510,7 +510,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -547,7 +547,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.getAndPut(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
}
return retVal;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 51568bc..6911b3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -223,7 +223,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
atomicView.put(key, ref);
- tx.commitTopLevelTx();
+ tx.commit();
return true;
}
@@ -265,7 +265,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
atomicView.getAndPut(key, ref);
- tx.commitTopLevelTx();
+ tx.commit();
return expVal;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 2572f19..87aae8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -545,7 +545,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
seqView.put(key, seq);
- tx.commitTopLevelTx();
+ tx.commit();
return curLocVal;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index ec1e766..14f80e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -277,7 +277,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
atomicView.put(key, stmp);
- tx.commitTopLevelTx();
+ tx.commit();
return true;
}
@@ -321,7 +321,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
atomicView.getAndPut(key, stmp);
- tx.commitTopLevelTx();
+ tx.commit();
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 03a7fb6..45c3677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -292,7 +292,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
return new CountDownLatch(0);
}
- tx.commitTopLevelTx();
+ tx.commit();
return new CountDownLatch(val.get());
}
@@ -432,7 +432,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
latchView.put(key, latchVal);
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index a62b656..5f0cb44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -561,7 +560,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
lockView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return true;
}
@@ -629,7 +628,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
lockView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
// Keep track of all threads that are queued in global queue.
// We deliberately don't use #sync.isQueued(), because AQS
@@ -647,7 +646,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
lockView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
sync.waitingThreads.remove(thread.getId());
@@ -806,7 +805,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
lockView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
return true;
}
@@ -1099,7 +1098,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
return null;
}
- tx.rollbackTopLevelTx();
+ tx.rollback();
return new Sync(val);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index c3e9218..a1c0515 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -320,7 +319,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
semView.put(key, val);
- tx.commitTopLevelTx();
+ tx.commit();
}
return retVal;
@@ -373,7 +372,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
Map<UUID, Integer> map = val.getWaiters();
if (!map.containsKey(nodeId)) {
- tx.rollbackTopLevelTx();
+ tx.rollback();
return false;
}
@@ -391,7 +390,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync.nodeMap = map;
- tx.commitTopLevelTx();
+ tx.commit();
return true;
}
@@ -472,7 +471,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
final boolean failoverSafe = val.isFailoverSafe();
- tx.commitTopLevelTx();
+ tx.commit();
return new Sync(cnt, waiters, failoverSafe);
}
@@ -687,7 +686,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
int cnt = val.getCount();
- tx.rollbackTopLevelTx();
+ tx.rollback();
return cnt;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 4b2d6cc..846eb69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -70,7 +70,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
else
retVal = false;
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -106,7 +106,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
retVal = (T)cache.getAndRemove(itemKey(idx));
if (retVal == null) { // Possible if data was lost.
- tx.commitTopLevelTx();
+ tx.commit();
continue;
}
@@ -114,7 +114,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
else
retVal = null;
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -164,7 +164,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
else
retVal = false;
- tx.commitTopLevelTx();
+ tx.commit();
return retVal;
}
@@ -197,7 +197,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
cache.remove(itemKey(idx));
}
- tx.commitTopLevelTx();
+ tx.commit();
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index dce97c7..acd0a1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -661,7 +661,7 @@ public class IgfsDataManager extends IgfsManager {
if (val != null) {
putBlock(fileInfo.blockSize(), key, val);
- tx.commitTopLevelTx();
+ tx.commit();
}
else {
// File is being concurrently deleted.
@@ -1086,7 +1086,7 @@ public class IgfsDataManager extends IgfsManager {
"[key=" + colocatedKey + ", relaxedKey=" + key + ", startOff=" + startOff +
", dataLen=" + data.length + ']');
- tx.commitTopLevelTx();
+ tx.commit();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 9ff3d40..77272e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -615,7 +615,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsEntryInfo newInfo = invokeLock(fileId, del);
- tx.commitTopLevelTx();
+ tx.commit();
return newInfo;
}
@@ -1039,7 +1039,7 @@ public class IgfsMetaManager extends IgfsManager {
transferEntry(srcEntry, srcParentInfo.id(), srcName, dstParentInfo.id(), dstName);
- tx.commitTopLevelTx();
+ tx.commit();
// Fire events.
IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName);
@@ -1172,7 +1172,7 @@ public class IgfsMetaManager extends IgfsManager {
// Note that root directory properties and other attributes are preserved:
id2InfoPrj.put(IgfsUtils.ROOT_ID, rootInfo.listing(null));
- tx.commitTopLevelTx();
+ tx.commit();
signalDeleteWorker();
@@ -1310,7 +1310,7 @@ public class IgfsMetaManager extends IgfsManager {
transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
- tx.commitTopLevelTx();
+ tx.commit();
signalDeleteWorker();
@@ -1401,7 +1401,7 @@ public class IgfsMetaManager extends IgfsManager {
id2InfoPrj.put(parentId, parentInfo.listing(newListing));
}
- tx.commitTopLevelTx();
+ tx.commit();
return res;
}
@@ -1454,7 +1454,7 @@ public class IgfsMetaManager extends IgfsManager {
id2InfoPrj.remove(id);
- tx.commitTopLevelTx();
+ tx.commit();
return true;
}
@@ -1519,7 +1519,7 @@ public class IgfsMetaManager extends IgfsManager {
try (GridNearTxLocal tx = startTx()) {
IgfsEntryInfo info = updatePropertiesNonTx(fileId, props);
- tx.commitTopLevelTx();
+ tx.commit();
return info;
}
@@ -1560,7 +1560,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsEntryInfo newInfo =
invokeAndGet(fileId, new IgfsMetaFileReserveSpaceProcessor(space, affRange));
- tx.commitTopLevelTx();
+ tx.commit();
return newInfo;
}
@@ -1616,7 +1616,7 @@ public class IgfsMetaManager extends IgfsManager {
throw fsException("Failed to update file info (file types differ)" +
" [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
- tx.commitTopLevelTx();
+ tx.commit();
return newInfo;
}
@@ -1679,7 +1679,7 @@ public class IgfsMetaManager extends IgfsManager {
continue;
// Commit TX.
- tx.commitTopLevelTx();
+ tx.commit();
generateCreateEvents(res.createdPaths(), false);
@@ -1711,7 +1711,7 @@ public class IgfsMetaManager extends IgfsManager {
try (GridNearTxLocal tx = startTx()) {
Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling);
- tx.commitTopLevelTx();
+ tx.commit();
return !F.eq(prev, val);
}
@@ -2756,7 +2756,7 @@ public class IgfsMetaManager extends IgfsManager {
}
}
- tx.commitTopLevelTx();
+ tx.commit();
}
catch (IgniteCheckedException e) {
if (!finished) {
@@ -2839,7 +2839,7 @@ public class IgfsMetaManager extends IgfsManager {
modificationTime == -1 ? targetInfo.modificationTime() : modificationTime)
);
- tx.commitTopLevelTx();
+ tx.commit();
return;
}
@@ -2948,7 +2948,7 @@ public class IgfsMetaManager extends IgfsManager {
// At this point we can open the stream safely.
info = invokeLock(info.id(), false);
- tx.commitTopLevelTx();
+ tx.commit();
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
@@ -2963,7 +2963,7 @@ public class IgfsMetaManager extends IgfsManager {
continue;
// Commit.
- tx.commitTopLevelTx();
+ tx.commit();
// Generate events.
generateCreateEvents(res.createdPaths(), true);
@@ -3103,7 +3103,7 @@ public class IgfsMetaManager extends IgfsManager {
newBlockSize, affKey, newLockId, evictExclude, newLen));
// Prepare result and commit.
- tx.commitTopLevelTx();
+ tx.commit();
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
@@ -3131,7 +3131,7 @@ public class IgfsMetaManager extends IgfsManager {
continue;
// Commit.
- tx.commitTopLevelTx();
+ tx.commit();
// Generate events.
generateCreateEvents(res.createdPaths(), true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 280817c..a680a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1042,7 +1042,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
cache.put(key, assigns);
- tx.commitTopLevelTx();
+ tx.commit();
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 53e6add..c4d8a79 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -88,6 +88,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
blockedMsgs.add(new T2<>(node, ioMsg));
+ notifyAll();
+
return;
}
}
@@ -137,6 +139,33 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
}
/**
+ * @param cls Message class.
+ * @param nodeName Node name.
+ * @throws InterruptedException If interrupted.
+ */
+ public void waitForMessage(Class<?> cls, String nodeName) throws InterruptedException {
+ synchronized (this) {
+ while (!hasMessage(cls, nodeName))
+ wait();
+ }
+ }
+
+ /**
+ * @param cls Message class.
+ * @param nodeName Node name.
+ * @return {@code True} if has blocked message.
+ */
+ private boolean hasMessage(Class<?> cls, String nodeName) {
+ for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+ if (msg.get2().message().getClass() == cls &&
+ nodeName.equals(msg.get1().attribute(ATTR_IGNITE_INSTANCE_NAME)))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param blockP Message block predicate.
*/
public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
index 84e439f..4fd4989 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
@@ -217,7 +217,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
cache.put("key", "val");
- tx.commitTopLevelTx();
+ tx.commit();
}
assert cache.containsKey("key");
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
index f79c3e7..f821a45 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
@@ -18,9 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.Map;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
@@ -28,12 +26,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -92,7 +88,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf
utilityCache.getAndPut("3", "3");
- itx.commitTopLevelTx();
+ itx.commit();
}
jcache.put("2", "22");
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 65fa7e0..91e3b26 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -176,7 +176,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
cache.putAll(map);
try {
- txEx.prepareTopLevelTx().get(3, TimeUnit.SECONDS);
+ txEx.prepareNearTxLocal().get(3, TimeUnit.SECONDS);
}
catch (IgniteFutureTimeoutCheckedException ignored) {
info("Failed to wait for prepare future completion: " + partial);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
index 3c1ae8e..4997b20 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -349,7 +349,7 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
TransactionProxyImpl txProxy = (TransactionProxyImpl)tx;
- IgniteInternalTx txEx = txProxy.tx();
+ GridNearTxLocal txEx = txProxy.tx();
assertTrue(txEx.pessimistic());
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index 97385ab..7ca3914 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -215,7 +215,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<?> prepFut = txEx.prepareTopLevelTx();
+ IgniteInternalFuture<?> prepFut = txEx.prepareNearTxLocal();
waitPrepared(ignite(1));
@@ -376,7 +376,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<?> prepFut = txEx.prepareTopLevelTx();
+ IgniteInternalFuture<?> prepFut = txEx.prepareNearTxLocal();
waitPrepared(ignite(1));