You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/22 11:42:35 UTC
[33/50] [abbrv] incubator-ignite git commit: GG-9141 - Renaming.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 8b6a693..5dfcf2a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -17,6 +17,7 @@ import org.gridgain.grid.kernal.managers.discovery.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.future.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.tostring.*;
@@ -36,8 +37,8 @@ import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
/**
*
*/
-public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<GridCacheTxEx<K, V>>
- implements GridCacheMvccFuture<K, V, GridCacheTxEx<K, V>> {
+public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteTxEx<K, V>>
+ implements GridCacheMvccFuture<K, V, IgniteTxEx<K, V>> {
/** */
private static final long serialVersionUID = 0L;
@@ -79,12 +80,12 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
* @param tx Transaction.
*/
public GridNearTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridNearTxLocal<K, V> tx) {
- super(cctx.kernalContext(), new IgniteReducer<GridCacheTxEx<K, V>, GridCacheTxEx<K, V>>() {
- @Override public boolean collect(GridCacheTxEx<K, V> e) {
+ super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() {
+ @Override public boolean collect(IgniteTxEx<K, V> e) {
return true;
}
- @Override public GridCacheTxEx<K, V> reduce() {
+ @Override public IgniteTxEx<K, V> reduce() {
// Nothing to aggregate.
return tx;
}
@@ -202,11 +203,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
* @return {@code True} if all locks are owned.
*/
private boolean checkLocks() {
- Collection<GridCacheTxEntry<K, V>> checkEntries = tx.groupLock() ?
+ Collection<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ?
Collections.singletonList(tx.groupLockEntry()) :
tx.writeEntries();
- for (GridCacheTxEntry<K, V> txEntry : checkEntries) {
+ for (IgniteTxEntry<K, V> txEntry : checkEntries) {
// Wait for near locks only.
if (!txEntry.context().isNear())
continue;
@@ -251,7 +252,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
*/
public void onResult(UUID nodeId, GridNearTxPrepareResponse<K, V> res) {
if (!isDone()) {
- for (IgniteFuture<GridCacheTxEx<K, V>> fut : pending()) {
+ for (IgniteFuture<IgniteTxEx<K, V>> fut : pending()) {
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
@@ -266,7 +267,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
}
/** {@inheritDoc} */
- @Override public boolean onDone(GridCacheTxEx<K, V> t, Throwable err) {
+ @Override public boolean onDone(IgniteTxEx<K, V> t, Throwable err) {
// If locks were not acquired yet, delay completion.
if (isDone() || (err == null && !checkLocks()))
return false;
@@ -434,7 +435,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
try {
prepare(
- tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<GridCacheTxEntry<K, V>>emptyList(),
+ tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry<K, V>>emptyList(),
tx.writeEntries());
markInitialized();
@@ -450,8 +451,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
* @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node.
*/
private void prepare(
- Iterable<GridCacheTxEntry<K, V>> reads,
- Iterable<GridCacheTxEntry<K, V>> writes
+ Iterable<IgniteTxEntry<K, V>> reads,
+ Iterable<IgniteTxEntry<K, V>> writes
) throws IgniteCheckedException {
assert tx.optimistic();
@@ -482,7 +483,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
// Assign keys to primary nodes.
GridDistributedTxMapping<K, V> cur = null;
- for (GridCacheTxEntry<K, V> read : reads) {
+ for (IgniteTxEntry<K, V> read : reads) {
GridDistributedTxMapping<K, V> updated = map(read, topVer, cur);
if (cur != updated) {
@@ -499,7 +500,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
}
}
- for (GridCacheTxEntry<K, V> write : writes) {
+ for (IgniteTxEntry<K, V> write : writes) {
GridDistributedTxMapping<K, V> updated = map(write, topVer, cur);
if (cur != updated) {
@@ -565,7 +566,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
tx.subjectId(),
tx.taskNameHash());
- for (GridCacheTxEntry<K, V> txEntry : m.writes()) {
+ for (IgniteTxEntry<K, V> txEntry : m.writes()) {
if (txEntry.op() == TRANSFORM)
req.addDhtVersion(txEntry.txKey(), null);
}
@@ -589,21 +590,21 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
// At this point, if any new node joined, then it is
// waiting for this transaction to complete, so
// partition reassignments are not possible here.
- IgniteFuture<GridCacheTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+ IgniteFuture<IgniteTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
// Add new future.
add(new GridEmbeddedFuture<>(
cctx.kernalContext(),
fut,
- new C2<GridCacheTxEx<K, V>, Exception, GridCacheTxEx<K, V>>() {
- @Override public GridCacheTxEx<K, V> apply(GridCacheTxEx<K, V> t, Exception ex) {
+ new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() {
+ @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> t, Exception ex) {
if (ex != null) {
onError(n.id(), mappings, ex);
return t;
}
- GridCacheTxLocalEx<K, V> dhtTx = (GridCacheTxLocalEx<K, V>)t;
+ IgniteTxLocalEx<K, V> dhtTx = (IgniteTxLocalEx<K, V>)t;
Collection<Integer> invalidParts = dhtTx.invalidPartitions();
@@ -616,7 +617,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
GridCacheVersion min = dhtTx.minVersion();
- GridCacheTxManager<K, V> tm = cctx.tm();
+ IgniteTxManager<K, V> tm = cctx.tm();
tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(),
tm.committedVersions(min), tm.rolledbackVersions(min));
@@ -657,7 +658,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
* @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
* @return Mapping.
*/
- private GridDistributedTxMapping<K, V> map(GridCacheTxEntry<K, V> entry, long topVer,
+ private GridDistributedTxMapping<K, V> map(IgniteTxEntry<K, V> entry, long topVer,
GridDistributedTxMapping<K, V> cur) throws IgniteCheckedException {
GridCacheContext<K, V> cacheCtx = entry.context();
@@ -725,7 +726,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
* Mini-future for get operations. Mini-futures are only waiting on a single
* node as opposed to multiple nodes.
*/
- private class MiniFuture extends GridFutureAdapter<GridCacheTxEx<K, V>> {
+ private class MiniFuture extends GridFutureAdapter<IgniteTxEx<K, V>> {
/** */
private static final long serialVersionUID = 0L;
@@ -831,8 +832,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
else {
assert F.isEmpty(res.invalidPartitions());
- for (Map.Entry<GridCacheTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : res.ownedValues().entrySet()) {
- GridCacheTxEntry<K, V> txEntry = tx.entry(entry.getKey());
+ for (Map.Entry<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : res.ownedValues().entrySet()) {
+ IgniteTxEntry<K, V> txEntry = tx.entry(entry.getKey());
assert txEntry != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 431e134..46e1b8a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -13,6 +13,7 @@ import org.apache.ignite.lang.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.direct.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -79,10 +80,10 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
public GridNearTxPrepareRequest(
IgniteUuid futId,
long topVer,
- GridCacheTxEx<K, V> tx,
- Collection<GridCacheTxEntry<K, V>> reads,
- Collection<GridCacheTxEntry<K, V>> writes,
- GridCacheTxKey grpLockKey,
+ IgniteTxEx<K, V> tx,
+ Collection<IgniteTxEntry<K, V>> reads,
+ Collection<IgniteTxEntry<K, V>> writes,
+ IgniteTxKey grpLockKey,
boolean partLock,
boolean near,
Map<UUID, Collection<UUID>> txNodes,
@@ -182,13 +183,13 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
* @param c Collection of entries to clone.
* @return Cloned collection.
*/
- private Collection<GridCacheTxEntry<K, V>> cloneEntries(Collection<GridCacheTxEntry<K, V>> c) {
+ private Collection<IgniteTxEntry<K, V>> cloneEntries(Collection<IgniteTxEntry<K, V>> c) {
if (F.isEmpty(c))
return c;
- Collection<GridCacheTxEntry<K, V>> cp = new ArrayList<>(c.size());
+ Collection<IgniteTxEntry<K, V>> cp = new ArrayList<>(c.size());
- for (GridCacheTxEntry<K, V> e : c) {
+ for (IgniteTxEntry<K, V> e : c) {
GridCacheContext<K, V> cacheCtx = e.context();
// Clone only if it is a near cache.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 08d7967..e453388 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -14,6 +14,7 @@ import org.apache.ignite.lang.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.direct.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.tostring.*;
@@ -53,7 +54,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes
/** Map of owned values to set on near node. */
@GridToStringInclude
@GridDirectTransient
- private Map<GridCacheTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedVals;
+ private Map<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedVals;
/** Marshalled owned bytes. */
@GridToStringExclude
@@ -136,7 +137,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes
* @param val Value.
* @param valBytes Value bytes.
*/
- public void addOwnedValue(GridCacheTxKey<K> key, GridCacheVersion ver, V val, byte[] valBytes) {
+ public void addOwnedValue(IgniteTxKey<K> key, GridCacheVersion ver, V val, byte[] valBytes) {
if (ownedVals == null)
ownedVals = new HashMap<>();
@@ -146,8 +147,8 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes
/**
* @return Owned values map.
*/
- public Map<GridCacheTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedValues() {
- return ownedVals == null ? Collections.<GridCacheTxKey<K>, GridTuple3<GridCacheVersion,V,byte[]>>emptyMap() :
+ public Map<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedValues() {
+ return ownedVals == null ? Collections.<IgniteTxKey<K>, GridTuple3<GridCacheVersion,V,byte[]>>emptyMap() :
Collections.unmodifiableMap(ownedVals);
}
@@ -155,7 +156,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes
* @param key Key.
* @return {@code True} if response has owned value for given key.
*/
- public boolean hasOwnedValue(GridCacheTxKey<K> key) {
+ public boolean hasOwnedValue(IgniteTxKey<K> key) {
return ownedVals != null && ownedVals.containsKey(key);
}
@@ -174,7 +175,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes
if (ownedVals != null && ownedValsBytes == null) {
ownedValsBytes = new ArrayList<>(ownedVals.size());
- for (Map.Entry<GridCacheTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : ownedVals.entrySet()) {
+ for (Map.Entry<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : ownedVals.entrySet()) {
GridTuple3<GridCacheVersion, V, byte[]> tup = entry.getValue();
boolean rawBytes = false;
@@ -204,7 +205,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes
ownedVals = new HashMap<>();
for (byte[] bytes : ownedValsBytes) {
- GridTuple4<GridCacheTxKey<K>, GridCacheVersion, byte[], Boolean> tup = ctx.marshaller().unmarshal(bytes, ldr);
+ GridTuple4<IgniteTxKey<K>, GridCacheVersion, byte[], Boolean> tup = ctx.marshaller().unmarshal(bytes, ldr);
V val = tup.get4() ? (V)tup.get3() : ctx.marshaller().<V>unmarshal(tup.get3(), ldr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
index 99d17eb..fbec6dd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -13,6 +13,7 @@ import org.apache.ignite.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.tostring.*;
import org.gridgain.grid.util.typedef.*;
@@ -32,7 +33,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
private static final long serialVersionUID = 0L;
/** Evicted keys. */
- private Collection<GridCacheTxKey<K>> evicted = new LinkedList<>();
+ private Collection<IgniteTxKey<K>> evicted = new LinkedList<>();
/** Near node ID. */
private UUID nearNodeId;
@@ -41,7 +42,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
private GridCacheVersion nearXidVer;
/** Owned versions. */
- private Map<GridCacheTxKey<K>, GridCacheVersion> owned;
+ private Map<IgniteTxKey<K>, GridCacheVersion> owned;
/** Group lock flag. */
private boolean grpLock;
@@ -86,9 +87,9 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
IgniteTxIsolation isolation,
boolean invalidate,
long timeout,
- Collection<GridCacheTxEntry<K, V>> writeEntries,
+ Collection<IgniteTxEntry<K, V>> writeEntries,
int txSize,
- @Nullable GridCacheTxKey grpLockKey,
+ @Nullable IgniteTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) throws IgniteCheckedException {
@@ -105,7 +106,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
writeEntries != null ? Math.max(txSize, writeEntries.size()) : txSize, 1.0f);
if (writeEntries != null)
- for (GridCacheTxEntry<K, V> entry : writeEntries) {
+ for (IgniteTxEntry<K, V> entry : writeEntries) {
entry.unmarshal(ctx, true, ldr);
addEntry(entry);
@@ -144,7 +145,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
boolean invalidate,
long timeout,
int txSize,
- @Nullable GridCacheTxKey grpLockKey,
+ @Nullable IgniteTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) {
@@ -176,7 +177,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
}
/** {@inheritDoc} */
- @Override public GridCacheVersion ownedVersion(GridCacheTxKey<K> key) {
+ @Override public GridCacheVersion ownedVersion(IgniteTxKey<K> key) {
return owned == null ? null : owned.get(key);
}
@@ -205,7 +206,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
*
* @param vers Map of owned versions.
*/
- public void ownedVersions(Map<GridCacheTxKey<K>, GridCacheVersion> vers) {
+ public void ownedVersions(Map<IgniteTxKey<K>, GridCacheVersion> vers) {
if (F.isEmpty(vers))
return;
@@ -230,7 +231,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
/**
* @return Evicted keys.
*/
- public Collection<GridCacheTxKey<K>> evicted() {
+ public Collection<IgniteTxKey<K>> evicted() {
return evicted;
}
@@ -239,7 +240,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
*
* @param key Evicted key.
*/
- public void addEvicted(GridCacheTxKey<K> key) {
+ public void addEvicted(IgniteTxKey<K> key) {
evicted.add(key);
}
@@ -250,8 +251,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param entries Entries to add.
* @throws IgniteCheckedException If failed.
*/
- public void addEntries(ClassLoader ldr, Iterable<GridCacheTxEntry<K, V>> entries) throws IgniteCheckedException {
- for (GridCacheTxEntry<K, V> entry : entries) {
+ public void addEntries(ClassLoader ldr, Iterable<IgniteTxEntry<K, V>> entries) throws IgniteCheckedException {
+ for (IgniteTxEntry<K, V> entry : entries) {
entry.unmarshal(cctx, true, ldr);
addEntry(entry);
@@ -263,7 +264,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @throws IgniteCheckedException If failed.
* @return {@code True} if entry was enlisted.
*/
- private boolean addEntry(GridCacheTxEntry<K, V> entry) throws IgniteCheckedException {
+ private boolean addEntry(IgniteTxEntry<K, V> entry) throws IgniteCheckedException {
checkInternal(entry.txKey());
GridCacheContext<K, V> cacheCtx = entry.context();
@@ -320,7 +321,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
*/
public boolean addEntry(
GridCacheContext<K, V> cacheCtx,
- GridCacheTxKey<K> key,
+ IgniteTxKey<K> key,
byte[] keyBytes,
GridCacheOperation op,
V val,
@@ -348,7 +349,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
return false;
}
else {
- GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, op, val, 0L, -1L, cached,
+ IgniteTxEntry<K, V> txEntry = new IgniteTxEntry<>(cacheCtx, this, op, val, 0L, -1L, cached,
drVer);
txEntry.keyBytes(keyBytes);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java
index 439a575..8915fc1 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java
@@ -11,6 +11,7 @@ package org.gridgain.grid.kernal.processors.cache.dr;
import org.apache.ignite.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.kernal.processors.dr.*;
import org.jetbrains.annotations.*;
@@ -64,7 +65,7 @@ public interface GridCacheDrManager<K, V> extends GridCacheManager<K, V> {
*/
public GridDrResolveResult<V> resolveTx(
GridCacheEntryEx<K, V> e,
- GridCacheTxEntry<K, V> txEntry,
+ IgniteTxEntry<K, V> txEntry,
GridCacheVersion newVer,
GridCacheOperation op,
V newVal,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java
index cb3f407..702dd33 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java
@@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.dr.os;
import org.apache.ignite.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.dr.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.kernal.processors.dr.*;
import org.jetbrains.annotations.*;
@@ -80,7 +81,7 @@ public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> {
/** {@inheritDoc} */
@Override public GridDrResolveResult<V> resolveTx(GridCacheEntryEx<K, V> e,
- GridCacheTxEntry<K, V> txEntry,
+ IgniteTxEntry<K, V> txEntry,
GridCacheVersion newVer,
GridCacheOperation op,
V newVal,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
index 66fffae..9ea91b9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
@@ -14,6 +14,7 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.future.*;
import org.gridgain.grid.util.typedef.*;
import org.jetbrains.annotations.*;
@@ -86,7 +87,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys, long timeout,
- GridCacheTxLocalEx<K, V> tx, boolean isRead,
+ IgniteTxLocalEx<K, V> tx, boolean isRead,
boolean retval, IgniteTxIsolation isolation, boolean invalidate,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
return lockAllAsync(keys, timeout, tx, filter);
@@ -95,7 +96,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
- GridCacheTxLocalEx<K, V> tx = ctx.tm().localTx();
+ IgniteTxLocalEx<K, V> tx = ctx.tm().localTx();
return lockAllAsync(keys, timeout, tx, filter);
}
@@ -108,7 +109,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
* @return Future.
*/
public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
- @Nullable GridCacheTxLocalEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+ @Nullable IgniteTxLocalEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java
index f5ff9aa..58b4cf0 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java
@@ -10,6 +10,7 @@
package org.gridgain.grid.kernal.processors.cache.local;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.jetbrains.annotations.*;
import static org.apache.ignite.events.IgniteEventType.*;
@@ -170,7 +171,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> {
}
/** {@inheritDoc} */
- @Override public boolean tmLock(GridCacheTxEx<K, V> tx, long timeout) throws GridCacheEntryRemovedException {
+ @Override public boolean tmLock(IgniteTxEx<K, V> tx, long timeout) throws GridCacheEntryRemovedException {
GridCacheMvccCandidate<K> cand = addLocal(
tx.threadId(),
tx.xidVersion(),
@@ -266,7 +267,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> {
*
* @param tx Transaction to unlock.
*/
- @Override public void txUnlock(GridCacheTxEx<K, V> tx) throws GridCacheEntryRemovedException {
+ @Override public void txUnlock(IgniteTxEx<K, V> tx) throws GridCacheEntryRemovedException {
removeLock(tx.xidVersion());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java
index 09bb220..25a13c5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java
@@ -12,9 +12,9 @@ package org.gridgain.grid.kernal.processors.cache.local;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.kernal.processors.timeout.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.grid.util.future.*;
@@ -76,7 +76,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
private IgnitePredicate<GridCacheEntry<K, V>>[] filter;
/** Transaction. */
- private GridCacheTxLocalEx<K, V> tx;
+ private IgniteTxLocalEx<K, V> tx;
/** Trackable flag. */
private boolean trackable = true;
@@ -99,7 +99,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
GridLocalLockFuture(
GridCacheContext<K, V> cctx,
Collection<? extends K> keys,
- GridCacheTxLocalEx<K, V> tx,
+ IgniteTxLocalEx<K, V> tx,
GridLocalCache<K, V> cache,
long timeout,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
index 8ceb402..0226ff2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
@@ -13,6 +13,7 @@ import org.apache.ignite.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.future.*;
import org.gridgain.grid.util.tostring.*;
import org.jetbrains.annotations.*;
@@ -26,7 +27,7 @@ import static org.apache.ignite.transactions.IgniteTxState.*;
/**
* Local cache transaction.
*/
-class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> {
+class GridLocalTx<K, V> extends IgniteTxLocalAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
@@ -65,7 +66,7 @@ class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteFuture<GridCacheTxEx<K, V>> future() {
+ @Override public IgniteFuture<IgniteTxEx<K, V>> future() {
return fut.get();
}
@@ -103,11 +104,11 @@ class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteFuture<GridCacheTxEx<K, V>> prepareAsync() {
+ @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() {
try {
prepare();
- return new GridFinishedFuture<GridCacheTxEx<K, V>>(cctx.kernalContext(), this);
+ return new GridFinishedFuture<IgniteTxEx<K, V>>(cctx.kernalContext(), this);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(cctx.kernalContext(), e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java
index e588cba..ff1682d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java
@@ -14,6 +14,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.grid.util.future.*;
import org.gridgain.grid.util.tostring.*;
@@ -27,8 +28,8 @@ import static org.apache.ignite.transactions.IgniteTxState.*;
/**
* Replicated cache transaction future.
*/
-final class GridLocalTxFuture<K, V> extends GridFutureAdapter<GridCacheTxEx<K, V>>
- implements GridCacheMvccFuture<K, V, GridCacheTxEx<K, V>> {
+final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteTxEx<K, V>>
+ implements GridCacheMvccFuture<K, V, IgniteTxEx<K, V>> {
/** */
private static final long serialVersionUID = 0L;
@@ -177,7 +178,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<GridCacheTxEx<K, V
*/
@SuppressWarnings({"ThrowableInstanceNeverThrown"})
void checkLocks() {
- for (GridCacheTxEntry<K, V> txEntry : tx.writeMap().values()) {
+ for (IgniteTxEntry<K, V> txEntry : tx.writeMap().values()) {
while (true) {
try {
GridCacheEntryEx<K, V> entry = txEntry.cached();
@@ -226,7 +227,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<GridCacheTxEx<K, V
if (log.isDebugEnabled())
log.debug("Transaction future received owner changed callback [owner=" + owner + ", entry=" + entry + ']');
- for (GridCacheTxEntry<K, V> txEntry : tx.writeMap().values()) {
+ for (IgniteTxEntry<K, V> txEntry : tx.writeMap().values()) {
while (true) {
try {
GridCacheEntryEx<K,V> cached = txEntry.cached();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 3a63fa1..be53cb3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -16,6 +16,7 @@ import org.apache.ignite.transactions.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.local.*;
+import org.gridgain.grid.kernal.processors.cache.transactions.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.future.*;
import org.gridgain.grid.util.typedef.*;
@@ -1248,7 +1249,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys,
long timeout,
- GridCacheTxLocalEx<K, V> tx,
+ IgniteTxLocalEx<K, V> tx,
boolean isRead,
boolean retval,
IgniteTxIsolation isolation,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
index 7e132a4..2f36aa8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -116,7 +116,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " +
"'txSerializableEnabled' configuration property)");
- GridCacheTxEx<K, V> tx = (GridCacheTxEx<K, V>)cctx.tm().userTx();
+ IgniteTxEx<K, V> tx = (IgniteTxEx<K, V>)cctx.tm().userTx();
if (tx != null)
throw new IllegalStateException("Failed to start new transaction " +
@@ -194,7 +194,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
throw new IllegalStateException("Failed to start new transaction " +
"(current thread already has a transaction): " + tx);
- GridCacheTxLocalAdapter<K, V> tx0 = cctx.tm().newTx(
+ IgniteTxLocalAdapter<K, V> tx0 = cctx.tm().newTx(
false,
false,
sys,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
new file mode 100644
index 0000000..a9d28f4
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
@@ -0,0 +1,1523 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.transactions;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+import static org.apache.ignite.transactions.IgniteTxState.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
+
+/**
+ * Managed transaction adapter.
+ */
+public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
+ implements IgniteTxEx<K, V>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Static logger to avoid re-creation. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Transaction ID. */
+ @GridToStringInclude
+ protected GridCacheVersion xidVer;
+
+ /** Entries write version. */
+ @GridToStringInclude
+ protected GridCacheVersion writeVer;
+
+ /** Implicit flag. */
+ @GridToStringInclude
+ protected boolean implicit;
+
+ /** Implicit with one key flag. */
+ @GridToStringInclude
+ protected boolean implicitSingle;
+
+ /** Local flag. */
+ @GridToStringInclude
+ protected boolean loc;
+
+ /** Thread ID. */
+ @GridToStringInclude
+ protected long threadId;
+
+ /** Transaction start time. */
+ @GridToStringInclude
+ protected long startTime = U.currentTimeMillis();
+
+ /** Node ID. */
+ @GridToStringInclude
+ protected UUID nodeId;
+
+ /** Transaction counter value at the start of transaction. */
+ @GridToStringInclude
+ protected GridCacheVersion startVer;
+
+ /** Cache registry. */
+ @GridToStringExclude
+ protected GridCacheSharedContext<K, V> cctx;
+
+ /**
+ * End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
+ * assigned to this transaction at the end of write phase.
+ */
+ @GridToStringInclude
+ protected GridCacheVersion endVer;
+
+ /** Isolation. */
+ @GridToStringInclude
+ protected IgniteTxIsolation isolation = READ_COMMITTED;
+
+ /** Concurrency. */
+ @GridToStringInclude
+ protected IgniteTxConcurrency concurrency = PESSIMISTIC;
+
+ /** Transaction timeout. */
+ @GridToStringInclude
+ protected long timeout;
+
+ /** Invalidate flag. */
+ protected volatile boolean invalidate;
+
+ /** Invalidation flag for system invalidations (not user-based ones). */
+ private boolean sysInvalidate;
+
+ /** Internal flag. */
+ protected boolean internal;
+
+ /** System transaction flag. */
+ private boolean sys;
+
+ /** */
+ protected boolean onePhaseCommit;
+
+ /** */
+ protected boolean syncCommit;
+
+ /** */
+ protected boolean syncRollback;
+
+ /** If this transaction contains transform entries. */
+ protected boolean transform;
+
+ /** Commit version. */
+ private AtomicReference<GridCacheVersion> commitVer = new AtomicReference<>(null);
+
+ /** Done marker. */
+ protected final AtomicBoolean isDone = new AtomicBoolean(false);
+
+ /** */
+ private AtomicReference<FinalizationStatus> finalizing = new AtomicReference<>(FinalizationStatus.NONE);
+
+ /** Preparing flag. */
+ private AtomicBoolean preparing = new AtomicBoolean();
+
+ /** */
+ private Set<Integer> invalidParts = new GridLeanSet<>();
+
+ /** Recover writes. */
+ private Collection<IgniteTxEntry<K, V>> recoveryWrites;
+
+ /**
+ * Transaction state. Note that state is not protected, as we want to
+ * always use {@link #state()} and {@link #state(IgniteTxState)}
+ * methods.
+ */
+ @GridToStringInclude
+ private volatile IgniteTxState state = ACTIVE;
+
+ /** Timed out flag. */
+ private volatile boolean timedOut;
+
+ /** */
+ protected int txSize;
+
+ /** Group lock key, if any. */
+ protected IgniteTxKey grpLockKey;
+
+ /** */
+ @GridToStringExclude
+ private AtomicReference<GridFutureAdapter<IgniteTx>> finFut = new AtomicReference<>();
+
+ /** Topology version. */
+ private AtomicLong topVer = new AtomicLong(-1);
+
+ /** Mutex. */
+ private final Lock lock = new ReentrantLock();
+
+ /** Lock condition. */
+ private final Condition cond = lock.newCondition();
+
+ /** Subject ID initiated this transaction. */
+ protected UUID subjId;
+
+ /** Task name hash code. */
+ protected int taskNameHash;
+
+ /** Task name. */
+ protected String taskName;
+
+ /** Store used flag. */
+ protected boolean storeEnabled = true;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ protected IgniteTxAdapter() {
+ // No-op.
+ }
+
+ /**
+ * @param cctx Cache registry.
+ * @param xidVer Transaction ID.
+ * @param implicit Implicit flag.
+ * @param implicitSingle Implicit with one key flag.
+ * @param loc Local flag.
+ * @param sys System transaction flag.
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @param timeout Timeout.
+ * @param txSize Transaction size.
+ * @param grpLockKey Group lock key if this is group-lock transaction.
+ */
+ protected IgniteTxAdapter(
+ GridCacheSharedContext<K, V> cctx,
+ GridCacheVersion xidVer,
+ boolean implicit,
+ boolean implicitSingle,
+ boolean loc,
+ boolean sys,
+ IgniteTxConcurrency concurrency,
+ IgniteTxIsolation isolation,
+ long timeout,
+ boolean invalidate,
+ boolean storeEnabled,
+ int txSize,
+ @Nullable IgniteTxKey grpLockKey,
+ @Nullable UUID subjId,
+ int taskNameHash
+ ) {
+ assert xidVer != null;
+ assert cctx != null;
+
+ this.cctx = cctx;
+ this.xidVer = xidVer;
+ this.implicit = implicit;
+ this.implicitSingle = implicitSingle;
+ this.loc = loc;
+ this.sys = sys;
+ this.concurrency = concurrency;
+ this.isolation = isolation;
+ this.timeout = timeout;
+ this.invalidate = invalidate;
+ this.storeEnabled = storeEnabled;
+ this.txSize = txSize;
+ this.grpLockKey = grpLockKey;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+
+ startVer = cctx.versions().last();
+
+ nodeId = cctx.discovery().localNode().id();
+
+ threadId = Thread.currentThread().getId();
+
+ log = U.logger(cctx.kernalContext(), logRef, this);
+ }
+
+ /**
+ * @param cctx Cache registry.
+ * @param nodeId Node ID.
+ * @param xidVer Transaction ID.
+ * @param startVer Start version mark.
+ * @param threadId Thread ID.
+ * @param sys System transaction flag.
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @param timeout Timeout.
+ * @param txSize Transaction size.
+ * @param grpLockKey Group lock key if this is group-lock transaction.
+ */
+ protected IgniteTxAdapter(
+ GridCacheSharedContext<K, V> cctx,
+ UUID nodeId,
+ GridCacheVersion xidVer,
+ GridCacheVersion startVer,
+ long threadId,
+ boolean sys,
+ IgniteTxConcurrency concurrency,
+ IgniteTxIsolation isolation,
+ long timeout,
+ int txSize,
+ @Nullable IgniteTxKey grpLockKey,
+ @Nullable UUID subjId,
+ int taskNameHash
+ ) {
+ this.cctx = cctx;
+ this.nodeId = nodeId;
+ this.threadId = threadId;
+ this.xidVer = xidVer;
+ this.startVer = startVer;
+ this.sys = sys;
+ this.concurrency = concurrency;
+ this.isolation = isolation;
+ this.timeout = timeout;
+ this.txSize = txSize;
+ this.grpLockKey = grpLockKey;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+
+ implicit = false;
+ implicitSingle = false;
+ loc = false;
+
+ log = U.logger(cctx.kernalContext(), logRef, this);
+ }
+
+ /**
+ * Acquires lock.
+ */
+ @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+ protected final void lock() {
+ lock.lock();
+ }
+
+ /**
+ * Releases lock.
+ */
+ protected final void unlock() {
+ lock.unlock();
+ }
+
+ /**
+ * Signals all waiters.
+ */
+ protected final void signalAll() {
+ cond.signalAll();
+ }
+
+ /**
+ * Waits for signal.
+ *
+ * @throws InterruptedException If interrupted.
+ */
+ protected final void awaitSignal() throws InterruptedException {
+ cond.await();
+ }
+
+ /**
+ * Checks whether near cache should be updated.
+ *
+ * @return Flag indicating whether near cache should be updated.
+ */
+ protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry<K, V>> optimisticLockEntries() {
+ assert optimistic();
+
+ if (!groupLock())
+ return writeEntries();
+ else {
+ if (!F.isEmpty(invalidParts)) {
+ assert invalidParts.size() == 1 : "Only one partition expected for group lock transaction " +
+ "[tx=" + this + ", invalidParts=" + invalidParts + ']';
+ assert groupLockEntry() == null : "Group lock key should be rejected " +
+ "[tx=" + this + ", groupLockEntry=" + groupLockEntry() + ']';
+ assert F.isEmpty(writeMap()) : "All entries should be rejected for group lock transaction " +
+ "[tx=" + this + ", writes=" + writeMap() + ']';
+
+ return Collections.emptyList();
+ }
+
+ IgniteTxEntry<K, V> grpLockEntry = groupLockEntry();
+
+ assert grpLockEntry != null || (near() && !local()):
+ "Group lock entry was not enlisted into transaction [tx=" + this +
+ ", grpLockKey=" + groupLockKey() + ']';
+
+ return grpLockEntry == null ?
+ Collections.<IgniteTxEntry<K,V>>emptyList() :
+ Collections.singletonList(grpLockEntry);
+ }
+ }
+
+ /**
+ * @param recoveryWrites Recover write entries.
+ */
+ public void recoveryWrites(Collection<IgniteTxEntry<K, V>> recoveryWrites) {
+ this.recoveryWrites = recoveryWrites;
+ }
+
+ /**
+ * @return Recover write entries.
+ */
+ @Override public Collection<IgniteTxEntry<K, V>> recoveryWrites() {
+ return recoveryWrites;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeEnabled() {
+ return storeEnabled;
+ }
+
+ /**
+ * @param storeEnabled Store enabled flag.
+ */
+ public void storeEnabled(boolean storeEnabled) {
+ this.storeEnabled = storeEnabled;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean system() {
+ return sys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeUsed() {
+ return storeEnabled() && store() != null;
+ }
+
+ /**
+ * Store manager for current transaction.
+ *
+ * @return Store manager.
+ */
+ protected GridCacheStoreManager<K, V> store() {
+ if (!activeCacheIds().isEmpty()) {
+ int cacheId = F.first(activeCacheIds());
+
+ GridCacheStoreManager<K, V> store = cctx.cacheContext(cacheId).store();
+
+ return store.configured() ? store : null;
+ }
+
+ return null;
+ }
+
+ /**
+ * This method uses unchecked assignment to cast group lock key entry to transaction generic signature.
+ *
+ * @return Group lock tx entry.
+ */
+ @SuppressWarnings("unchecked")
+ public IgniteTxEntry<K, V> groupLockEntry() {
+ return ((IgniteTxAdapter)this).entry(groupLockKey());
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID otherNodeId() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID subjectId() {
+ if (subjId != null)
+ return subjId;
+
+ return originatingNodeId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long topologyVersion() {
+ long res = topVer.get();
+
+ if (res == -1)
+ return cctx.exchange().topologyVersion();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long topologyVersion(long topVer) {
+ this.topVer.compareAndSet(-1, topVer);
+
+ return this.topVer.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasTransforms() {
+ return transform;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean markPreparing() {
+ return preparing.compareAndSet(false, true);
+ }
+
+ /**
+ * @return {@code True} if marked.
+ */
+ @Override public boolean markFinalizing(FinalizationStatus status) {
+ boolean res;
+
+ switch (status) {
+ case USER_FINISH:
+ res = finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.USER_FINISH);
+
+ break;
+
+ case RECOVERY_WAIT:
+ finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT);
+
+ FinalizationStatus cur = finalizing.get();
+
+ res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH;
+
+ break;
+
+ case RECOVERY_FINISH:
+ FinalizationStatus old = finalizing.get();
+
+ res = old != FinalizationStatus.USER_FINISH && finalizing.compareAndSet(old, status);
+
+ break;
+
+ default:
+ throw new IllegalArgumentException("Cannot set finalization status: " + status);
+
+ }
+
+ if (res) {
+ if (log.isDebugEnabled())
+ log.debug("Marked transaction as finalized: " + this);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Transaction was not marked finalized: " + this);
+ }
+
+ return res;
+ }
+
+ /**
+ * @return Finalization status.
+ */
+ protected FinalizationStatus finalizationStatus() {
+ return finalizing.get();
+ }
+
+ /**
+ * @return {@code True} if transaction has at least one key enlisted.
+ */
+ public abstract boolean isStarted();
+
+ /** {@inheritDoc} */
+ @Override public boolean groupLock() {
+ return grpLockKey != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxKey groupLockKey() {
+ return grpLockKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return txSize;
+ }
+
+ /**
+ * @return Logger.
+ */
+ protected IgniteLogger log() {
+ return log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean near() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean implicit() {
+ return implicit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean implicitSingle() {
+ return implicitSingle;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean local() {
+ return loc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final boolean user() {
+ return !implicit() && local() && !dht() && !internal();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean dht() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean colocated() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replicated() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean enforceSerializable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean syncCommit() {
+ return syncCommit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean syncRollback() {
+ return syncRollback;
+ }
+
+ /**
+ * @param syncCommit Synchronous commit flag.
+ */
+ public void syncCommit(boolean syncCommit) {
+ this.syncCommit = syncCommit;
+ }
+
+ /**
+ * @param syncRollback Synchronous rollback flag.
+ */
+ public void syncRollback(boolean syncRollback) {
+ this.syncRollback = syncRollback;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid xid() {
+ return xidVer.asGridUuid();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Integer> invalidPartitions() {
+ return invalidParts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part) {
+ invalidParts.add(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion ownedVersion(IgniteTxKey<K> key) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /**
+ * Gets remaining allowed transaction time.
+ *
+ * @return Remaining transaction time.
+ */
+ @Override public long remainingTime() {
+ if (timeout() <= 0)
+ return -1;
+
+ long timeLeft = timeout() - (U.currentTimeMillis() - startTime());
+
+ if (timeLeft < 0)
+ return 0;
+
+ return timeLeft;
+ }
+
+ /**
+ * @return Lock timeout.
+ */
+ protected long lockTimeout() {
+ long timeout = remainingTime();
+
+ return timeout < 0 ? 0 : timeout == 0 ? -1 : timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion xidVersion() {
+ return xidVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long threadId() {
+ return threadId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxIsolation isolation() {
+ return isolation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxConcurrency concurrency() {
+ return concurrency;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout() {
+ return timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout(long timeout) {
+ if (isStarted())
+ throw new IllegalStateException("Cannot change timeout after transaction has started: " + this);
+
+ long old = this.timeout;
+
+ this.timeout = timeout;
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean ownsLock(GridCacheEntryEx<K, V> entry) throws GridCacheEntryRemovedException {
+ GridCacheContext<K, V> cacheCtx = entry.context();
+
+ IgniteTxEntry<K, V> txEntry = entry(entry.txKey());
+
+ GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion();
+
+ assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " +
+ "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']';
+
+ return local() && !cacheCtx.isDht() ?
+ entry.lockedByThread(threadId()) || (explicit != null && entry.lockedBy(explicit)) :
+ // If candidate is not there, then lock was explicit.
+ // Otherwise, check if entry is owned by version.
+ !entry.hasLockCandidate(xidVersion()) || entry.lockedBy(xidVersion());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean ownsLockUnsafe(GridCacheEntryEx<K, V> entry) {
+ GridCacheContext<K, V> cacheCtx = entry.context();
+
+ IgniteTxEntry<K, V> txEntry = entry(entry.txKey());
+
+ GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion();
+
+ assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " +
+ "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']';
+
+ return local() && !cacheCtx.isDht() ?
+ entry.lockedByThreadUnsafe(threadId()) || (explicit != null && entry.lockedByUnsafe(explicit)) :
+ // If candidate is not there, then lock was explicit.
+ // Otherwise, check if entry is owned by version.
+ !entry.hasLockCandidateUnsafe(xidVersion()) || entry.lockedByUnsafe(xidVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxState state() {
+ return state;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean setRollbackOnly() {
+ return state(MARKED_ROLLBACK);
+ }
+
+ /**
+ * @return {@code True} if rollback only flag is set.
+ */
+ @Override public boolean isRollbackOnly() {
+ return state == MARKED_ROLLBACK || state == ROLLING_BACK || state == ROLLED_BACK;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean done() {
+ return isDone.get();
+ }
+
+ /**
+ * @return Commit version.
+ */
+ @Override public GridCacheVersion commitVersion() {
+ initCommitVersion();
+
+ return commitVer.get();
+ }
+
+ /**
+ * @param commitVer Commit version.
+ * @return {@code True} if set to not null value.
+ */
+ @Override public boolean commitVersion(GridCacheVersion commitVer) {
+ return commitVer != null && this.commitVer.compareAndSet(null, commitVer);
+ }
+
+ /**
+ *
+ */
+ public void initCommitVersion() {
+ if (commitVer.get() == null)
+ commitVer.compareAndSet(null, xidVer);
+ }
+
+ /**
+ *
+ */
+ @Override public void close() throws IgniteCheckedException {
+ IgniteTxState state = state();
+
+ if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED)
+ rollback();
+
+ awaitCompletion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needsCompletedVersions() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed,
+ Collection<GridCacheVersion> txs) {
+ /* No-op. */
+ }
+
+ /**
+ * Awaits transaction completion.
+ *
+ * @throws IgniteCheckedException If waiting failed.
+ */
+ protected void awaitCompletion() throws IgniteCheckedException {
+ lock();
+
+ try {
+ while (!done())
+ awaitSignal();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ if (!done())
+ throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + this, e);
+ }
+ finally {
+ unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean internal() {
+ return internal;
+ }
+
+ /**
+ * @param key Key.
+ * @return {@code True} if key is internal.
+ */
+ protected boolean checkInternal(IgniteTxKey<K> key) {
+ if (key.key() instanceof GridCacheInternal) {
+ internal = true;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param onePhaseCommit {@code True} if transaction commit should be performed in short-path way.
+ */
+ public void onePhaseCommit(boolean onePhaseCommit) {
+ this.onePhaseCommit = onePhaseCommit;
+ }
+
+ /**
+ * @return Fast commit flag.
+ */
+ @Override public boolean onePhaseCommit() {
+ return onePhaseCommit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean optimistic() {
+ return concurrency == OPTIMISTIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pessimistic() {
+ return concurrency == PESSIMISTIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean serializable() {
+ return isolation == SERIALIZABLE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean repeatableRead() {
+ return isolation == REPEATABLE_READ;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readCommitted() {
+ return isolation == READ_COMMITTED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean state(IgniteTxState state) {
+ return state(state, false);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+ @Override public IgniteFuture<IgniteTx> finishFuture() {
+ GridFutureAdapter<IgniteTx> fut = finFut.get();
+
+ if (fut == null) {
+ fut = new GridFutureAdapter<IgniteTx>(cctx.kernalContext()) {
+ @Override public String toString() {
+ return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this);
+ }
+ };
+
+ if (!finFut.compareAndSet(null, fut))
+ fut = finFut.get();
+ }
+
+ assert fut != null;
+
+ if (isDone.get())
+ fut.onDone(this);
+
+ return fut;
+ }
+
+ /**
+ *
+ * @param state State to set.
+ * @param timedOut Timeout flag.
+ * @return {@code True} if state changed.
+ */
+ @SuppressWarnings({"TooBroadScope"})
+ private boolean state(IgniteTxState state, boolean timedOut) {
+ boolean valid = false;
+
+ IgniteTxState prev;
+
+ boolean notify = false;
+
+ lock();
+
+ try {
+ prev = this.state;
+
+ switch (state) {
+ case ACTIVE: {
+ valid = false;
+
+ break;
+ } // Active is initial state and cannot be transitioned to.
+ case PREPARING: {
+ valid = prev == ACTIVE;
+
+ break;
+ }
+ case PREPARED: {
+ valid = prev == PREPARING;
+
+ break;
+ }
+ case COMMITTING: {
+ valid = prev == PREPARED;
+
+ break;
+ }
+
+ case UNKNOWN: {
+ if (isDone.compareAndSet(false, true))
+ notify = true;
+
+ valid = prev == ROLLING_BACK || prev == COMMITTING;
+
+ break;
+ }
+
+ case COMMITTED: {
+ if (isDone.compareAndSet(false, true))
+ notify = true;
+
+ valid = prev == COMMITTING;
+
+ break;
+ }
+
+ case ROLLED_BACK: {
+ if (isDone.compareAndSet(false, true))
+ notify = true;
+
+ valid = prev == ROLLING_BACK;
+
+ break;
+ }
+
+ case MARKED_ROLLBACK: {
+ valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING;
+
+ break;
+ }
+
+ case ROLLING_BACK: {
+ valid =
+ prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING ||
+ prev == PREPARED || (prev == COMMITTING && local() && !dht());
+
+ break;
+ }
+ }
+
+ if (valid) {
+ this.state = state;
+ this.timedOut = timedOut;
+
+ if (log.isDebugEnabled())
+ log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
+
+ // Notify of state change.
+ signalAll();
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Invalid transaction state transition [invalid=" + state + ", cur=" + this.state +
+ ", tx=" + this + ']');
+ }
+ }
+ finally {
+ unlock();
+ }
+
+ if (notify) {
+ GridFutureAdapter<IgniteTx> fut = finFut.get();
+
+ if (fut != null)
+ fut.onDone(this);
+ }
+
+ if (valid) {
+ // Seal transactions maps.
+ if (state != ACTIVE)
+ seal();
+
+ cctx.tm().onTxStateChange(prev, state, this);
+ }
+
+ return valid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion startVersion() {
+ return startVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion endVersion() {
+ return endVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void endVersion(GridCacheVersion endVer) {
+ this.endVer = endVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion writeVersion() {
+ return writeVer == null ? commitVersion() : writeVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeVersion(GridCacheVersion writeVer) {
+ this.writeVer = writeVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return xidVer.asGridUuid();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ long endTime = timeout == 0 ? Long.MAX_VALUE : startTime + timeout;
+
+ return endTime > 0 ? endTime : endTime < 0 ? Long.MAX_VALUE : endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ state(MARKED_ROLLBACK, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean timedOut() {
+ return timedOut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void invalidate(boolean invalidate) {
+ if (isStarted() && !dht())
+ throw new IllegalStateException("Cannot change invalidation flag after transaction has started: " + this);
+
+ this.invalidate = invalidate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInvalidate() {
+ return invalidate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isSystemInvalidate() {
+ return sysInvalidate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void systemInvalidate(boolean sysInvalidate) {
+ this.sysInvalidate = sysInvalidate;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<UUID, Collection<UUID>> transactionNodes() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridCacheVersion nearXidVersion() {
+ return null;
+ }
+
+ /**
+ * @param txEntry Entry to process.
+ * @param metrics {@code True} if metrics should be updated.
+ * @return Tuple containing transformation results.
+ * @throws IgniteCheckedException If failed to get previous value for transform.
+ * @throws GridCacheEntryRemovedException If entry was concurrently deleted.
+ */
+ protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures(IgniteTxEntry<K, V> txEntry,
+ boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
+ GridCacheContext cacheCtx = txEntry.context();
+
+ assert cacheCtx != null;
+
+ if (isSystemInvalidate())
+ return F.t(cacheCtx.isStoreEnabled() ? RELOAD : DELETE, null, null);
+ if (F.isEmpty(txEntry.transformClosures()))
+ return F.t(txEntry.op(), txEntry.value(), txEntry.valueBytes());
+ else {
+ try {
+ boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
+
+ V val = txEntry.hasValue() ? txEntry.value() :
+ txEntry.cached().innerGet(this,
+ /*swap*/false,
+ /*read through*/false,
+ /*fail fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/metrics,
+ /*event*/recordEvt,
+ /*temporary*/true,
+ /*subjId*/subjId,
+ /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null,
+ resolveTaskName(),
+ CU.<K, V>empty());
+
+ try {
+ for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+ val = clos.apply(val);
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Transform closure must not throw any exceptions " +
+ "(transaction will be invalidated)", e);
+ }
+
+ GridCacheOperation op = val == null ? DELETE : UPDATE;
+
+ return F.t(op, (V)cacheCtx.<V>unwrapTemporary(val), null);
+ }
+ catch (GridCacheFilterFailedException e) {
+ assert false : "Empty filter failed for innerGet: " + e;
+
+ return null;
+ }
+ }
+ }
+
+ /**
+ * @return Resolves task name.
+ */
+ public String resolveTaskName() {
+ if (taskName != null)
+ return taskName;
+
+ return (taskName = cctx.kernalContext().task().resolveTaskName(taskNameHash));
+ }
+
+ /**
+ * @param e Transaction entry.
+ * @param primaryOnly Flag to include backups into check or not.
+ * @return {@code True} if entry is locally mapped as a primary or back up node.
+ */
+ protected boolean isNearLocallyMapped(IgniteTxEntry<K, V> e, boolean primaryOnly) {
+ GridCacheContext<K, V> cacheCtx = e.context();
+
+ if (!cacheCtx.isNear())
+ return false;
+
+ // Try to take either entry-recorded primary node ID,
+ // or transaction node ID from near-local transactions.
+ UUID nodeId = e.nodeId() == null ? local() ? this.nodeId : null : e.nodeId();
+
+ if (nodeId != null && nodeId.equals(cctx.localNodeId()))
+ return true;
+
+ GridCacheEntryEx<K, V> cached = e.cached();
+
+ int part = cached != null ? cached.partition() : cacheCtx.affinity().partition(e.key());
+
+ List<ClusterNode> affNodes = cacheCtx.affinity().nodes(part, topologyVersion());
+
+ e.locallyMapped(F.contains(affNodes, cctx.localNode()));
+
+ if (primaryOnly) {
+ ClusterNode primary = F.first(affNodes);
+
+ if (primary == null && !isAffinityNode(cacheCtx.config()))
+ return false;
+
+ assert primary != null : "Primary node is null for affinity nodes: " + affNodes;
+
+ return primary.isLocal();
+ }
+ else
+ return e.locallyMapped();
+ }
+
+ /**
+ * @param e Entry to evict if it qualifies for eviction.
+ * @param primaryOnly Flag to try to evict only on primary node.
+ * @return {@code True} if attempt was made to evict the entry.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected boolean evictNearEntry(IgniteTxEntry<K, V> e, boolean primaryOnly) throws IgniteCheckedException {
+ assert e != null;
+
+ if (isNearLocallyMapped(e, primaryOnly)) {
+ GridCacheEntryEx<K, V> cached = e.cached();
+
+ assert cached instanceof GridNearCacheEntry : "Invalid cache entry: " + e;
+
+ if (log.isDebugEnabled())
+ log.debug("Evicting dht-local entry from near cache [entry=" + cached + ", tx=" + this + ']');
+
+ if (cached != null && cached.markObsolete(xidVer))
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ writeExternalMeta(out);
+
+ out.writeObject(xidVer);
+ out.writeBoolean(invalidate);
+ out.writeLong(timeout);
+ out.writeLong(threadId);
+ out.writeLong(startTime);
+
+ U.writeUuid(out, nodeId);
+
+ out.write(isolation.ordinal());
+ out.write(concurrency.ordinal());
+ out.write(state().ordinal());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ readExternalMeta(in);
+
+ xidVer = (GridCacheVersion)in.readObject();
+ invalidate = in.readBoolean();
+ timeout = in.readLong();
+ threadId = in.readLong();
+ startTime = in.readLong();
+
+ nodeId = U.readUuid(in);
+
+ isolation = IgniteTxIsolation.fromOrdinal(in.read());
+ concurrency = IgniteTxConcurrency.fromOrdinal(in.read());
+
+ state = IgniteTxState.fromOrdinal(in.read());
+ }
+
+ /**
+ * Reconstructs object on unmarshalling.
+ *
+ * @return Reconstructed object.
+ * @throws ObjectStreamException Thrown in case of unmarshalling error.
+ */
+ protected Object readResolve() throws ObjectStreamException {
+ return new TxShadow(
+ xidVer.asGridUuid(),
+ nodeId,
+ threadId,
+ startTime,
+ isolation,
+ concurrency,
+ invalidate,
+ implicit,
+ timeout,
+ state(),
+ isRollbackOnly()
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return o == this || (o instanceof IgniteTxAdapter && xidVer.equals(((IgniteTxAdapter)o).xidVer));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return xidVer.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(IgniteTxAdapter.class, this,
+ "duration", (U.currentTimeMillis() - startTime) + "ms", "grpLock", groupLock(),
+ "onePhaseCommit", onePhaseCommit);
+ }
+
+ /**
+ * Transaction shadow class to be used for deserialization.
+ */
+ private static class TxShadow extends GridMetadataAwareAdapter implements IgniteTx {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Xid. */
+ private final IgniteUuid xid;
+
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Thread ID. */
+ private final long threadId;
+
+ /** Start time. */
+ private final long startTime;
+
+ /** Transaction isolation. */
+ private final IgniteTxIsolation isolation;
+
+ /** Concurrency. */
+ private final IgniteTxConcurrency concurrency;
+
+ /** Invalidate flag. */
+ private final boolean invalidate;
+
+ /** Timeout. */
+ private final long timeout;
+
+ /** State. */
+ private final IgniteTxState state;
+
+ /** Rollback only flag. */
+ private final boolean rollbackOnly;
+
+ /** Implicit flag. */
+ private final boolean implicit;
+
+ /**
+ * @param xid Xid.
+ * @param nodeId Node ID.
+ * @param threadId Thread ID.
+ * @param startTime Start time.
+ * @param isolation Isolation.
+ * @param concurrency Concurrency.
+ * @param invalidate Invalidate flag.
+ * @param implicit Implicit flag.
+ * @param timeout Transaction timeout.
+ * @param state Transaction state.
+ * @param rollbackOnly Rollback-only flag.
+ */
+ TxShadow(IgniteUuid xid, UUID nodeId, long threadId, long startTime, IgniteTxIsolation isolation,
+ IgniteTxConcurrency concurrency, boolean invalidate, boolean implicit, long timeout,
+ IgniteTxState state, boolean rollbackOnly) {
+ this.xid = xid;
+ this.nodeId = nodeId;
+ this.threadId = threadId;
+ this.startTime = startTime;
+ this.isolation = isolation;
+ this.concurrency = concurrency;
+ this.invalidate = invalidate;
+ this.implicit = implicit;
+ this.timeout = timeout;
+ this.state = state;
+ this.rollbackOnly = rollbackOnly;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid xid() {
+ return xid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long threadId() {
+ return threadId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxIsolation isolation() {
+ return isolation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxConcurrency concurrency() {
+ return concurrency;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInvalidate() {
+ return invalidate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean implicit() {
+ return implicit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout() {
+ return timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxState state() {
+ return state;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRollbackOnly() {
+ return rollbackOnly;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout(long timeout) {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean setRollbackOnly() {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit() {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<IgniteTx> commitAsync() {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollback() {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return this == o || o instanceof IgniteTx && xid.equals(((IgniteTx)o).xid());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return xid.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TxShadow.class, this);
+ }
+ }
+}