You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 16:35:03 UTC
[2/2] ignite git commit: ignite-4768 txs
ignite-4768 txs
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ceecfa3c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ceecfa3c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ceecfa3c
Branch: refs/heads/ignite-4768-1
Commit: ceecfa3cc67908783592a72aad1a22d7fc2f4332
Parents: 5523eac
Author: sboikov <sb...@gridgain.com>
Authored: Tue Mar 14 18:17:29 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Mar 14 19:34:07 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 11 +-
.../processors/cache/GridCacheProxyImpl.java | 2 +-
.../cache/GridCacheSharedContext.java | 2 +-
.../processors/cache/GridCacheUtils.java | 7 +-
.../processors/cache/IgniteInternalCache.java | 2 +-
.../GridDistributedTxRemoteAdapter.java | 22 ++--
.../dht/GridDhtTransactionalCacheAdapter.java | 39 ++-----
.../cache/distributed/dht/GridDhtTxLocal.java | 4 -
.../distributed/dht/GridDhtTxLocalAdapter.java | 6 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 2 +-
.../near/GridNearTransactionalCache.java | 2 +-
.../cache/distributed/near/GridNearTxLocal.java | 66 ++++++++----
.../cache/transactions/IgniteInternalTx.java | 3 -
.../cache/transactions/IgniteTxHandler.java | 30 +++---
.../cache/transactions/IgniteTxRemoteEx.java | 11 ++
.../datastructures/DataStructuresProcessor.java | 60 +++++------
.../datastructures/GridCacheAtomicLongImpl.java | 34 +++---
.../GridCacheAtomicReferenceImpl.java | 10 +-
.../GridCacheAtomicSequenceImpl.java | 6 +-
.../GridCacheAtomicStampedImpl.java | 10 +-
.../GridCacheCountDownLatchImpl.java | 10 +-
.../datastructures/GridCacheLockImpl.java | 20 ++--
.../datastructures/GridCacheSemaphoreImpl.java | 27 ++---
.../GridTransactionalCacheQueueImpl.java | 20 ++--
.../processors/igfs/IgfsDataManager.java | 59 +++++-----
.../processors/igfs/IgfsMetaManager.java | 107 +++++++++----------
.../service/GridServiceProcessor.java | 8 +-
.../cache/IgniteTxConfigCacheSelfTest.java | 6 +-
.../IgniteCacheSystemTransactionsSelfTest.java | 5 +-
...xOriginatingNodeFailureAbstractSelfTest.java | 6 +-
...ePrimaryNodeFailureRecoveryAbstractTest.java | 9 +-
...lockMessageSystemPoolStarvationSelfTest.java | 8 +-
.../processors/cache/jta/CacheJtaResource.java | 10 +-
33 files changed, 323 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index c98cd24..bbf19f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -89,16 +89,13 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
-import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
@@ -3290,7 +3287,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+ @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions();
return txs.txStartEx(ctx, concurrency, isolation);
@@ -4153,7 +4150,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut,
new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
@Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) {
- return tx.commitAsync();
+ return tx.commitTopLevelTxAsync();
}
});
@@ -4162,7 +4159,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return f;
}
- IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync();
+ IgniteInternalFuture<IgniteInternalTx> f = tx.commitTopLevelTxAsync();
saveFuture(holder, f);
@@ -4243,7 +4240,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
catch (IgniteCheckedException e) {
if (!(e instanceof IgniteTxRollbackCheckedException)) {
try {
- tx.rollback();
+ tx.rollbackTopLevelTx();
e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
tx.xid(), e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 731d23b..c54ac2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -940,7 +940,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
- @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+ @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
CacheOperationContext prev = gate.enter(opCtx);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index e4f8fec..989a810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -749,7 +749,7 @@ public class GridCacheSharedContext<K, V> {
if (ctx == null) {
tx.txState().awaitLastFut(this);
- return tx.commitAsync();
+ return tx.commitTopLevelTxAsync();
}
else
return ctx.cache().commitTxAsync(tx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 3e68b70..77a99fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -891,7 +892,7 @@ public class GridCacheUtils {
* @param isolation Isolation.
* @return New transaction.
*/
- public static IgniteInternalTx txStartInternal(GridCacheContext ctx, IgniteInternalCache prj,
+ public static GridNearTxLocal txStartInternal(GridCacheContext ctx, IgniteInternalCache prj,
TransactionConcurrency concurrency, TransactionIsolation isolation) {
assert ctx != null;
assert prj != null;
@@ -1257,10 +1258,10 @@ public class GridCacheUtils {
public static <K, V> void inTx(IgniteInternalCache<K, V> cache, TransactionConcurrency concurrency,
TransactionIsolation isolation, IgniteInClosureX<IgniteInternalCache<K ,V>> clo) throws IgniteCheckedException {
- try (IgniteInternalTx tx = cache.txStartEx(concurrency, isolation);) {
+ try (GridNearTxLocal tx = cache.txStartEx(concurrency, isolation);) {
clo.applyx(cache);
- tx.commit();
+ tx.commitTopLevelTx();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 02b6461..5471335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -953,7 +953,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
* @param isolation Isolation.
* @return New transaction.
*/
- public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation);
+ public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation);
/**
* Starts transaction with specified isolation, concurrency, timeout, invalidation flag,
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index e60d3dc..5be1fe9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -731,7 +731,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public void commit() throws IgniteCheckedException {
+ @Override public final void commitRemoteTx() throws IgniteCheckedException {
if (optimistic())
state(PREPARED);
@@ -750,12 +750,17 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
if (!isSystemInvalidate())
throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
- rollback();
+ rollbackRemoteTx();
}
commitIfLocked();
}
+ /** {@inheritDoc} */
+ @Override public void commit() throws IgniteCheckedException {
+ commitRemoteTx();
+ }
+
/**
* Forces commit for this tx.
*
@@ -768,7 +773,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
try {
- commit();
+ commitRemoteTx();
return new GridFinishedFuture<IgniteInternalTx>(this);
}
@@ -778,8 +783,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @SuppressWarnings({"CatchGenericClass"})
- @Override public void rollback() {
+ @Override public void rollbackRemoteTx() {
try {
// Note that we don't evict near entries here -
// they will be deleted by their corresponding transactions.
@@ -797,8 +801,14 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
+ @SuppressWarnings({"CatchGenericClass"})
+ @Override public void rollback() {
+ rollbackRemoteTx();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
- rollback();
+ rollbackRemoteTx();
return new GridFinishedFuture<IgniteInternalTx>(this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index dea4072..0c63e45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -307,7 +307,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (tx.state() == COMMITTING)
tx.forceCommit();
else
- tx.rollback();
+ tx.rollbackRemoteTx();
}
return null;
@@ -362,7 +362,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (log.isDebugEnabled())
log.debug("Rolling back remote DHT transaction because it is empty [req=" + req + ", res=" + res + ']');
- tx.rollback();
+ tx.rollbackRemoteTx();
tx = null;
}
@@ -374,7 +374,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param nodeId Node ID.
* @param req Request.
*/
- protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) {
+ private void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) {
if (txLockMsgLog.isDebugEnabled()) {
txLockMsgLog.debug("Received dht lock request [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
@@ -452,7 +452,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param nodeId Node ID.
* @param req Request.
*/
- protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) {
+ private void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) {
assert nodeId != null;
assert req != null;
assert !nodeId.equals(locNodeId);
@@ -602,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param nodeId Node ID.
* @param req Request.
*/
- protected void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) {
+ private void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) {
clearLocks(nodeId, req);
if (isNearEnabled(cacheCfg))
@@ -1038,31 +1038,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
t.xidVersion(),
e);
- if (resp.error() == null && t.onePhaseCommit()) {
- assert t.implicit();
-
- return t.commitAsync().chain(
- new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() {
- @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) {
- try {
- // Check for error.
- f.get();
- }
- catch (IgniteCheckedException e1) {
- resp.error(e1);
- }
-
- sendLockReply(nearNode, t, req, resp);
-
- return resp;
- }
- });
- }
- else {
- sendLockReply(nearNode, t, req, resp);
+ assert !t.implicit() : t;
+ assert !t.onePhaseCommit() : t;
- return new GridFinishedFuture<>(resp);
- }
+ sendLockReply(nearNode, t, req, resp);
+
+ return new GridFinishedFuture<>(resp);
}
}
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index bff69bc..6dd40b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -550,10 +550,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (log.isDebugEnabled())
log.debug("Committing dht local tx: " + this);
- // In optimistic mode prepare was called explicitly.
- if (pessimistic())
- prepareAsync();
-
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true);
cctx.mvcc().addFuture(fut, fut.futureId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index c7967d3..8e82c53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -29,7 +29,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -654,7 +653,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
needRetVal,
createTtl,
accessTtl,
- null,
skipStore,
keepBinary);
}
@@ -673,7 +671,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
* @param needRetVal Return value flag.
* @param createTtl TTL for create operation.
* @param accessTtl TTL for read operation.
- * @param filter Entry write filter.
* @param skipStore Skip store flag.
* @return Future for lock acquisition.
*/
@@ -685,7 +682,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
final boolean needRetVal,
final long createTtl,
final long accessTtl,
- @Nullable final CacheEntryPredicate[] filter,
boolean skipStore,
boolean keepBinary) {
if (log.isDebugEnabled())
@@ -729,7 +725,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/*retval*/false,
/*read*/read,
accessTtl,
- filter == null ? CU.empty0() : filter,
+ CU.empty0(),
/*computeInvoke*/false);
return ret;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 6189b38..cd3b0ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -160,7 +160,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (e instanceof IgniteTxRollbackCheckedException) {
if (marked) {
try {
- tx.rollback();
+ tx.rollbackTopLevelTx();
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index c39af34..5ad05b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -377,7 +377,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
if (tx != null)
- tx.rollback();
+ tx.rollbackRemoteTx();
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index aa94ea7..5a5470b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -687,7 +687,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
if (implicit())
try {
- commit();
+ commitTopLevelTx();
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
@@ -965,7 +965,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
for (Object key : keys) {
if (key == null) {
- rollback();
+ rollbackTopLevelTx();
throw new NullPointerException("Null key.");
}
@@ -1473,7 +1473,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (F.isEmpty(keys0)) {
if (implicit()) {
try {
- commit();
+ commitTopLevelTx();
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
@@ -1604,7 +1604,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
// with prepare response, if required.
assert loadFut.isDone();
- return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ return nonInterruptable(commitTopLevelTxAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
@Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
throws IgniteCheckedException {
try {
@@ -2371,7 +2371,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return new GridFinishedFuture<>(e);
}
- return nonInterruptable(commitAsync().chain(
+ return nonInterruptable(commitTopLevelTxAsync().chain(
new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
@Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
throws IgniteCheckedException {
@@ -3067,8 +3067,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return true;
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> prepareAsync() {
+ /**
+ * @return Tx prepare future.
+ */
+ public IgniteInternalFuture<?> prepareTopLevelTx() {
GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut;
if (fut == null) {
@@ -3104,8 +3106,21 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
- @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
+ @Override public IgniteInternalFuture<?> prepareAsync() {
+ return prepareTopLevelTx();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void commitTopLevelTx() throws IgniteCheckedException {
+ commitTopLevelTxAsync().get();
+ }
+
+ /**
+ * @return Finish future.
+ */
+ public IgniteInternalFuture<IgniteInternalTx> commitTopLevelTxAsync() {
if (log.isDebugEnabled())
log.debug("Committing near local tx: " + this);
@@ -3121,7 +3136,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return new GridFinishedFuture<>((IgniteInternalTx)this);
}
- prepareAsync();
+ final IgniteInternalFuture<?> prepareFut = prepareTopLevelTx();
GridNearTxFinishFuture fut = commitFut;
@@ -3131,8 +3146,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut, fut.futureId());
- final IgniteInternalFuture<?> prepareFut = prepFut;
-
prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
GridNearTxFinishFuture fut0 = commitFut;
@@ -3163,7 +3176,21 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+ @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
+ return commitTopLevelTxAsync();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void rollbackTopLevelTx() throws IgniteCheckedException {
+ rollbackTopLevelTxAsync().get();
+ }
+
+ /**
+ * @return Rollback future.
+ */
+ public IgniteInternalFuture<IgniteInternalTx> rollbackTopLevelTxAsync() {
if (log.isDebugEnabled())
log.debug("Rolling back near tx: " + this);
@@ -3226,6 +3253,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return fut;
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+ return rollbackTopLevelTxAsync();
+ }
+
/**
* @return {@code True} if 'fast finish' path can be used for transaction completion.
*/
@@ -3300,7 +3332,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + this, e));
try {
- rollback();
+ rollbackTopLevelTx();
}
catch (IgniteTxOptimisticCheckedException e1) {
if (log.isDebugEnabled())
@@ -3325,10 +3357,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (log.isDebugEnabled())
log.debug("Committing colocated tx locally: " + this);
- // In optimistic mode prepare was called explicitly.
- if (pessimistic())
- prepareAsync();
-
IgniteInternalFuture<?> prep = prepFut;
// Do not create finish future if there are no remote nodes.
@@ -3905,7 +3933,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
// Commit implicit transactions.
if (implicit())
- commit();
+ commitTopLevelTx();
rollback = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index e3adfc9..d26696e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.lang.GridTuple;
-import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -184,7 +183,6 @@ public interface IgniteInternalTx extends AutoCloseable {
*
* @throws IgniteCheckedException If commit failed.
*/
- @IgniteAsyncSupported
public void commit() throws IgniteCheckedException;
/**
@@ -199,7 +197,6 @@ public interface IgniteInternalTx extends AutoCloseable {
*
* @throws IgniteCheckedException If rollback failed.
*/
- @IgniteAsyncSupported
public void rollback() throws IgniteCheckedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 331ca31..77387b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -711,8 +711,10 @@ public class IgniteTxHandler {
* @param req Request.
* @return Future.
*/
- @Nullable public IgniteInternalFuture<IgniteInternalTx> finish(UUID nodeId, @Nullable GridNearTxLocal locTx,
- GridNearTxFinishRequest req) {
+ @Nullable public IgniteInternalFuture<IgniteInternalTx> finish(UUID nodeId,
+ @Nullable GridNearTxLocal locTx,
+ GridNearTxFinishRequest req)
+ {
assert nodeId != null;
assert req != null;
@@ -763,8 +765,10 @@ public class IgniteTxHandler {
* @param req Finish request.
* @return Finish future.
*/
- private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal locTx,
- GridNearTxFinishRequest req) {
+ private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId,
+ @Nullable GridNearTxLocal locTx,
+ GridNearTxFinishRequest req)
+ {
GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
GridDhtTxLocal tx = null;
@@ -1011,7 +1015,7 @@ public class IgniteTxHandler {
U.error(log, "Failed to process prepare request: " + req, e);
if (nearTx != null)
- nearTx.rollback();
+ nearTx.rollbackRemoteTx();
res = new GridDhtTxPrepareResponse(
req.partition(),
@@ -1212,12 +1216,12 @@ public class IgniteTxHandler {
tx.setPartitionUpdateCounters(
req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null);
- tx.commit();
+ tx.commitRemoteTx();
}
else {
tx.doneRemote(req.baseVersion(), null, null, null);
- tx.rollback();
+ tx.rollbackRemoteTx();
}
}
catch (Throwable e) {
@@ -1228,7 +1232,7 @@ public class IgniteTxHandler {
tx.systemInvalidate(true);
try {
- tx.commit();
+ tx.commitRemoteTx();
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to invalidate transaction: " + tx, ex);
@@ -1255,7 +1259,7 @@ public class IgniteTxHandler {
// Complete remote candidates.
tx.doneRemote(req.version(), null, null, null);
- tx.commit();
+ tx.commitRemoteTx();
}
catch (IgniteTxHeuristicCheckedException e) {
// Just rethrow this exception. Transaction was already uncommitted.
@@ -1268,7 +1272,7 @@ public class IgniteTxHandler {
tx.invalidate(true);
tx.systemInvalidate(true);
- tx.rollback();
+ tx.rollbackRemoteTx();
if (e instanceof Error)
throw (Error)e;
@@ -1314,10 +1318,10 @@ public class IgniteTxHandler {
}
if (nearTx != null)
- nearTx.rollback();
+ nearTx.rollbackRemoteTx();
if (dhtTx != null)
- dhtTx.rollback();
+ dhtTx.rollbackRemoteTx();
}
}
@@ -1565,7 +1569,7 @@ public class IgniteTxHandler {
res.invalidPartitionsByCacheId(tx.invalidPartitions());
if (tx.empty() && req.last()) {
- tx.rollback();
+ tx.rollbackRemoteTx();
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 8ceca3f..87cc7cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
@@ -25,6 +26,16 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
*/
public interface IgniteTxRemoteEx extends IgniteInternalTx {
/**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void commitRemoteTx() throws IgniteCheckedException;
+
+ /**
+ *
+ */
+ public void rollbackRemoteTx();
+
+ /**
* @param baseVer Base version.
* @param committedVers Committed version.
* @param rolledbackVers Rolled back version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 96644a3..c8c9219 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -67,7 +67,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -95,8 +95,8 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_STAMPED;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
-import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.REENTRANT_LOCK;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -342,7 +342,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class);
// Check that sequence hasn't been created in other thread yet.
@@ -395,7 +395,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, seq);
- tx.commit();
+ tx.commitTopLevelTx();
return seq;
}
@@ -471,7 +471,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class);
// Check that atomic long hasn't been created in other thread yet.
@@ -496,7 +496,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, a);
- tx.commit();
+ tx.commitTopLevelTx();
return a;
}
@@ -551,7 +551,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (!create)
return c.applyx();
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
IgniteCheckedException err =
utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
@@ -560,7 +560,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
T dataStructure = c.applyx();
- tx.commit();
+ tx.commitTopLevelTx();
return dataStructure;
}
@@ -623,7 +623,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
retryTopologySafe(new IgniteOutClosureX<Void>() {
@Override public Void applyx() throws IgniteCheckedException {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
T2<Boolean, IgniteCheckedException> res =
utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
@@ -641,7 +641,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
T rmvInfo = c.applyx();
- tx.commit();
+ tx.commitTopLevelTx();
if (afterRmv != null && rmvInfo != null)
afterRmv.applyx(rmvInfo);
@@ -682,7 +682,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue val = cast(dsView.get(key),
GridCacheAtomicReferenceValue.class);
@@ -709,7 +709,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, ref);
- tx.commit();
+ tx.commitTopLevelTx();
return ref;
}
@@ -786,7 +786,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicStampedValue val = cast(dsView.get(key),
GridCacheAtomicStampedValue.class);
@@ -813,7 +813,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, stmp);
- tx.commit();
+ tx.commitTopLevelTx();
return stmp;
}
@@ -1033,7 +1033,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return retryTopologySafe(new IgniteOutClosureX<T>() {
@Override public T applyx() throws IgniteCheckedException {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
T2<String, IgniteCheckedException> res =
utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
@@ -1048,7 +1048,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
T col = c.applyx(cacheCtx);
- tx.commit();
+ tx.commitTopLevelTx();
return col;
}
@@ -1133,7 +1133,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class);
// Check that count down hasn't been created in other thread yet.
@@ -1162,7 +1162,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, latch);
- tx.commit();
+ tx.commitTopLevelTx();
return latch;
}
@@ -1198,7 +1198,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
GridCacheCountDownLatchValue val =
cast(dsView.get(key), GridCacheCountDownLatchValue.class);
@@ -1211,7 +1211,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.remove(key);
- tx.commit();
+ tx.commitTopLevelTx();
}
else
tx.setRollbackOnly();
@@ -1254,7 +1254,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
// Check that semaphore hasn't been created in other thread yet.
@@ -1283,7 +1283,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, sem0);
- tx.commit();
+ tx.commitTopLevelTx();
return sem0;
}
@@ -1319,7 +1319,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
@@ -1329,7 +1329,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.remove(key);
- tx.commit();
+ tx.commitTopLevelTx();
}
else
tx.setRollbackOnly();
@@ -1371,7 +1371,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class);
// Check that reentrant lock hasn't been created in other thread yet.
@@ -1401,7 +1401,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsMap.put(key, reentrantLock0);
- tx.commit();
+ tx.commitTopLevelTx();
return reentrantLock0;
}
@@ -1438,7 +1438,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class);
@@ -1448,7 +1448,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.remove(key);
- tx.commit();
+ tx.commitTopLevelTx();
}
else
tx.setRollbackOnly();
@@ -1474,14 +1474,14 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return CU.outTx(
new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
R val = cast(dsView.get(key), cls);
if (val != null) {
dsView.remove(key);
- tx.commit();
+ tx.commitTopLevelTx();
}
else
tx.setRollbackOnly();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index dfd2122..9ebea2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -90,7 +90,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Callable for {@link #incrementAndGet()}. */
private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -102,7 +102,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return retVal;
}
@@ -117,7 +117,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Callable for {@link #getAndIncrement()}. */
private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -129,7 +129,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return retVal;
}
@@ -144,7 +144,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Callable for {@link #decrementAndGet()}. */
private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -156,7 +156,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return retVal;
}
@@ -171,7 +171,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Callable for {@link #getAndDecrement()}. */
private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -183,7 +183,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return retVal;
}
@@ -430,7 +430,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
private Callable<Long> internalAddAndGet(final long l) {
return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -442,7 +442,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return retVal;
}
@@ -464,7 +464,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
private Callable<Long> internalGetAndAdd(final long l) {
return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -476,7 +476,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return retVal;
}
@@ -498,7 +498,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
private Callable<Long> internalGetAndSet(final long l) {
return new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -510,7 +510,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return retVal;
}
@@ -534,7 +534,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) {
return new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -547,7 +547,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
atomicView.getAndPut(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
}
return retVal;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 448dd8b..51568bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -213,7 +213,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
private Callable<Boolean> internalSet(final T val) {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
if (ref == null)
@@ -223,7 +223,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
atomicView.put(key, ref);
- tx.commit();
+ tx.commitTopLevelTx();
return true;
}
@@ -247,7 +247,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) {
return retryTopologySafe(new Callable<T>() {
@Override public T call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
if (ref == null)
@@ -265,7 +265,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
atomicView.getAndPut(key, ref);
- tx.commit();
+ tx.commitTopLevelTx();
return expVal;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 754d8f5..2572f19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -486,7 +486,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
private Callable<Long> internalUpdate(final long l, final boolean updated) {
return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seq = seqView.get(key);
checkRemoved();
@@ -545,7 +545,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
seqView.put(key, seq);
- tx.commit();
+ tx.commitTopLevelTx();
return curLocVal;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 6ac303c..ec1e766 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -267,7 +267,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
private Callable<Boolean> internalSet(final T val, final S stamp) {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
if (stmp == null)
@@ -277,7 +277,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
atomicView.put(key, stmp);
- tx.commit();
+ tx.commitTopLevelTx();
return true;
}
@@ -305,7 +305,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
final IgniteClosure<S, S> newStampClos) {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
if (stmp == null)
@@ -321,7 +321,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
atomicView.getAndPut(key, stmp);
- tx.commit();
+ tx.commitTopLevelTx();
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 723fb55..03a7fb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -32,7 +32,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -282,7 +282,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
internalLatch = CU.outTx(
retryTopologySafe(new Callable<CountDownLatch>() {
@Override public CountDownLatch call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue val = latchView.get(key);
if (val == null) {
@@ -292,7 +292,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
return new CountDownLatch(0);
}
- tx.commit();
+ tx.commitTopLevelTx();
return new CountDownLatch(val.get());
}
@@ -407,7 +407,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public Integer call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue latchVal = latchView.get(key);
if (latchVal == null) {
@@ -432,7 +432,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
latchView.put(key, latchVal);
- tx.commit();
+ tx.commitTopLevelTx();
return retVal;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index 1cf78fa..a62b656 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -520,8 +521,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
-
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
if (val == null)
@@ -561,7 +561,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
lockView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return true;
}
@@ -614,7 +614,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
if (val == null)
@@ -629,7 +629,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
lockView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
// Keep track of all threads that are queued in global queue.
// We deliberately don't use #sync.isQueued(), because AQS
@@ -647,7 +647,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
lockView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
sync.waitingThreads.remove(thread.getId());
@@ -711,7 +711,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
if (val == null)
@@ -806,7 +806,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
lockView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
return true;
}
@@ -1089,7 +1089,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
sync = CU.outTx(
retryTopologySafe(new Callable<Sync>() {
@Override public Sync call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
if (val == null) {
@@ -1099,7 +1099,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
return null;
}
- tx.rollback();
+ tx.rollbackTopLevelTx();
return new Sync(val);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ceecfa3c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index a11c79d..c3e9218 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -285,7 +286,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx,
semView,
PESSIMISTIC, REPEATABLE_READ)
) {
@@ -319,7 +320,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
semView.put(key, val);
- tx.commit();
+ tx.commitTopLevelTx();
}
return retVal;
@@ -359,7 +360,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (
- IgniteInternalTx tx = CU.txStartInternal(ctx,
+ GridNearTxLocal tx = CU.txStartInternal(ctx,
semView,
PESSIMISTIC, REPEATABLE_READ)
) {
@@ -372,7 +373,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
Map<UUID, Integer> map = val.getWaiters();
if (!map.containsKey(nodeId)) {
- tx.rollback();
+ tx.rollbackTopLevelTx();
return false;
}
@@ -390,7 +391,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync.nodeMap = map;
- tx.commit();
+ tx.commitTopLevelTx();
return true;
}
@@ -454,7 +455,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync = CU.outTx(
retryTopologySafe(new Callable<Sync>() {
@Override public Sync call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx,
semView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semView.get(key);
@@ -465,15 +466,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return null;
}
- final int count = val.getCount();
+ final int cnt = val.getCount();
Map<UUID, Integer> waiters = val.getWaiters();
final boolean failoverSafe = val.isFailoverSafe();
- tx.commit();
+ tx.commitTopLevelTx();
- return new Sync(count, waiters, failoverSafe);
+ return new Sync(cnt, waiters, failoverSafe);
}
}
}),
@@ -676,7 +677,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
retryTopologySafe(new Callable<Integer>() {
@Override public Integer call() throws Exception {
try (
- IgniteInternalTx tx = CU.txStartInternal(ctx,
+ GridNearTxLocal tx = CU.txStartInternal(ctx,
semView, PESSIMISTIC, REPEATABLE_READ)
) {
GridCacheSemaphoreState val = semView.get(key);
@@ -684,11 +685,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
if (val == null)
throw new IgniteException("Failed to find semaphore with given name: " + name);
- int count = val.getCount();
+ int cnt = val.getCount();
- tx.rollback();
+ tx.rollbackTopLevelTx();
- return count;
+ return cnt;
}
}
}),