You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/03 03:03:16 UTC
[07/50] [abbrv] ignite git commit: Merge branch sprint-3 into
ignite-264
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 8f4700f,71e173b..9a0044f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@@ -298,7 -303,10 +299,7 @@@ public class GridNearTransactionalCache
req.taskNameHash()
);
- tx = ctx.tm().onCreated(tx);
- if (req.groupLock())
- tx.groupLockKey(txKey);
-
+ tx = ctx.tm().onCreated(null, tx);
if (tx == null || !ctx.tm().onStarted(tx))
throw new IgniteTxRollbackCheckedException("Failed to acquire lock " +
@@@ -661,12 -669,17 +661,12 @@@
if (map == null || map.isEmpty())
return;
- for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) {
- Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
- Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
-
+ for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
ClusterNode n = mapping.getKey();
- GridDistributedUnlockRequest<K, V> req = mapping.getValue();
+ GridDistributedUnlockRequest req = mapping.getValue();
- if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) {
+ if (!F.isEmpty(req.keys())) {
- req.completedVersions(committed, rolledback);
-
// We don't wait for reply to this message.
ctx.io().send(n, req, ctx.ioPolicy());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index e6930d0,5d1a306..a995d47
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@@ -21,9 -21,9 +21,10 @@@ import org.apache.ignite.*
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.transactions.*;
@@@ -356,100 -321,11 +350,100 @@@ public final class GridNearTxFinishFutu
}
/**
+ *
+ */
+ private void checkBackup() {
+ assert mappings.size() <= 1;
+
+ for (UUID nodeId : mappings.keySet()) {
+ Collection<UUID> backups = tx.transactionNodes().get(nodeId);
+
+ if (!F.isEmpty(backups)) {
+ assert backups.size() == 1;
+
+ UUID backupId = F.first(backups);
+
+ ClusterNode backup = ctx.discovery().node(backupId);
+
+ // Nothing to do if backup has left the grid.
+ if (backup == null)
+ return;
+
+ MiniFuture mini = new MiniFuture(backup);
+
+ add(mini);
+
+ GridDhtTxFinishRequest<K, V> finishReq = new GridDhtTxFinishRequest<>(
+ cctx.localNodeId(),
+ futureId(),
+ mini.futureId(),
+ tx.topologyVersion(),
+ tx.xidVersion(),
+ tx.commitVersion(),
+ tx.threadId(),
+ tx.isolation(),
+ true,
+ false,
+ tx.system(),
+ false,
+ true,
+ true,
+ 0,
+ null,
+ 0);
+
+ finishReq.checkCommitted(true);
+
+ try {
+ cctx.io().send(backup, finishReq, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ mini.onResult(e);
+ }
+ catch (IgniteCheckedException e) {
+ mini.onResult(e);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private boolean needFinishOnePhase() {
+ for (Integer cacheId : tx.activeCacheIds()) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (cacheCtx.isNear())
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ private void finishOnePhase() {
+ // No need to send messages as transaction was already committed on remote node.
+ // Finish local mapping only as we need send commit message to backups.
- for (GridDistributedTxMapping<K, V> m : mappings.values()) {
++ for (GridDistributedTxMapping m : mappings.values()) {
+ if (m.node().isLocal()) {
+ IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finishColocatedLocal(commit, tx);
+
+ // Add new future.
+ if (fut != null)
+ add(fut);
+ }
+ }
+ }
+
+ /**
* @param mappings Mappings.
*/
- private void finish(Iterable<GridDistributedTxMapping<K, V>> mappings) {
+ private void finish(Iterable<GridDistributedTxMapping> mappings) {
// Create mini futures.
- for (GridDistributedTxMapping<K, V> m : mappings)
+ for (GridDistributedTxMapping m : mappings)
finish(m);
}
@@@ -536,24 -416,12 +531,15 @@@
/** Keys. */
@GridToStringInclude
- private GridDistributedTxMapping<K, V> m;
+ private GridDistributedTxMapping m;
+ /** Backup check flag. */
+ private ClusterNode backup;
+
/**
- * Empty constructor required for {@link Externalizable}.
- */
- public MiniFuture() {
- // No-op.
- }
-
- /**
* @param m Mapping.
*/
- MiniFuture(GridDistributedTxMapping<K, V> m) {
- super(cctx.kernalContext());
-
+ MiniFuture(GridDistributedTxMapping m) {
this.m = m;
}
@@@ -614,9 -471,7 +600,9 @@@
/**
* @param res Result callback.
*/
- void onResult(GridNearTxFinishResponse<K, V> res) {
+ void onResult(GridNearTxFinishResponse res) {
+ assert backup == null;
+
if (res.error() != null)
onDone(res.error());
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 6ebcb59,7b0b811..10c19e8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@@ -83,11 -89,15 +86,23 @@@ public class GridNearTxFinishRequest ex
boolean syncRollback,
boolean explicitLock,
boolean storeEnabled,
- long topVer,
- @NotNull AffinityTopologyVersion topVer,
- GridCacheVersion baseVer,
- Collection<GridCacheVersion> committedVers,
- Collection<GridCacheVersion> rolledbackVers,
++ AffinityTopologyVersion topVer,
int txSize,
@Nullable UUID subjId,
int taskNameHash) {
- super(xidVer, futId, null, threadId, commit, invalidate, sys, syncCommit, syncRollback, txSize);
- super(xidVer, futId, null, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer,
- committedVers, rolledbackVers, txSize, null);
++ super(
++ xidVer,
++ futId,
++ null,
++ threadId,
++ commit,
++ invalidate,
++ sys,
++ plc,
++ syncCommit,
++ syncRollback,
++ txSize
++ );
this.explicitLock = explicitLock;
this.storeEnabled = storeEnabled;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b6bc993,1db4902..b411b99
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -83,11 -80,8 +80,11 @@@ public class GridNearTxLocal extends Gr
private boolean colocatedLocallyMapped;
/** Info for entries accessed locally in optimistic transaction. */
- private Map<IgniteTxKey<K>, IgniteCacheExpiryPolicy> accessMap;
+ private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
+ /** */
+ private boolean needCheckBackup;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@@ -134,10 -134,13 +133,12 @@@
timeout,
invalidate,
storeEnabled,
+ false,
txSize,
- grpLockKey,
- partLock,
subjId,
taskNameHash);
+
+ initResult();
}
/** {@inheritDoc} */
@@@ -276,7 -269,10 +281,7 @@@
}
/** {@inheritDoc} */
- @Override public Collection<IgniteTxEntry<K, V>> optimisticLockEntries() {
+ @Override public Collection<IgniteTxEntry> optimisticLockEntries() {
- if (groupLock())
- return super.optimisticLockEntries();
-
return optimisticLockEntries;
}
@@@ -547,13 -549,22 +551,13 @@@
/**
* @param mapping Mapping to order.
- * @param pendingVers Pending versions.
- * @param committedVers Committed versions.
- * @param rolledbackVers Rolled back versions.
*/
- void readyNearLocks(GridDistributedTxMapping<K, V> mapping) {
- Collection<IgniteTxEntry<K, V>> entries = F.concat(false, mapping.reads(), mapping.writes());
- void readyNearLocks(GridDistributedTxMapping mapping,
- Collection<GridCacheVersion> pendingVers,
- Collection<GridCacheVersion> committedVers,
- Collection<GridCacheVersion> rolledbackVers)
- {
- Collection<IgniteTxEntry> entries = groupLock() ?
- Collections.singletonList(groupLockEntry()) :
- F.concat(false, mapping.reads(), mapping.writes());
++ void readyNearLocks(GridDistributedTxMapping mapping) {
++ Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes());
- for (IgniteTxEntry<K, V> txEntry : entries) {
+ for (IgniteTxEntry txEntry : entries) {
while (true) {
- GridCacheContext<K, V> cacheCtx = txEntry.cached().context();
+ GridCacheContext cacheCtx = txEntry.cached().context();
assert cacheCtx.isNear();
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index d43e409,4d70afb..b7cdbb5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@@ -196,12 -189,7 +189,12 @@@ public final class GridNearTxPrepareFut
* @param mappings Remaining mappings.
* @param e Error.
*/
- void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping<K, V>> mappings, Throwable e) {
+ void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
+ if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
+ if (tx.onePhaseCommit())
+ tx.markForBackupCheck();
+ }
+
if (err.compareAndSet(null, e)) {
boolean marked = tx.setRollbackOnly();
@@@ -441,30 -430,32 +435,26 @@@
private void prepare0() {
assert tx.optimistic();
- try {
- prepare(
- tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
- tx.writeEntries());
+ prepare(
- tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry<K, V>>emptyList(),
++ tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
+ tx.writeEntries());
- markInitialized();
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ markInitialized();
}
/**
* @param reads Read entries.
* @param writes Write entries.
- * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node.
*/
private void prepare(
- Iterable<IgniteTxEntry<K, V>> reads,
- Iterable<IgniteTxEntry<K, V>> writes
+ Iterable<IgniteTxEntry> reads,
+ Iterable<IgniteTxEntry> writes
- ) throws IgniteCheckedException {
+ ) {
assert tx.optimistic();
- GridDiscoveryTopologySnapshot snapshot = tx.topologySnapshot();
-
- assert snapshot != null;
-
- long topVer = snapshot.topologyVersion();
+ AffinityTopologyVersion topVer = tx.topologyVersion();
- assert topVer > 0;
+ assert topVer.topologyVersion() > 0;
txMapping = new GridDhtTxMapping<>();
@@@ -739,15 -737,16 +729,15 @@@
* @param entry Transaction entry.
* @param topVer Topology version.
* @param cur Current mapping.
- * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
* @return Mapping.
*/
- private GridDistributedTxMapping<K, V> map(
- IgniteTxEntry<K, V> entry,
- long topVer,
- GridDistributedTxMapping<K, V> cur,
+ private GridDistributedTxMapping map(
+ IgniteTxEntry entry,
+ AffinityTopologyVersion topVer,
+ GridDistributedTxMapping cur,
boolean waitLock
- ) throws IgniteCheckedException {
+ ) {
- GridCacheContext<K, V> cacheCtx = entry.context();
+ GridCacheContext cacheCtx = entry.context();
List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
@@@ -763,17 -762,23 +753,17 @@@
", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
}
- if (tx.groupLock() && !primary.isLocal())
- throw new IgniteCheckedException("Failed to prepare group lock transaction (local node is not primary for " +
- " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() + ']');
-
// Must re-initialize cached entry while holding topology lock.
if (cacheCtx.isNear())
- entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer), entry.keyBytes());
+ entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
else if (!cacheCtx.isLocal())
- entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true), entry.keyBytes());
+ entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
else
- entry.cached(cacheCtx.local().entryEx(entry.key(), topVer), entry.keyBytes());
+ entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
if (cacheCtx.isNear() || cacheCtx.isLocal()) {
- if (waitLock && entry.explicitVersion() == null) {
- if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey()))
- lockKeys.add(entry.txKey());
- }
+ if (waitLock && entry.explicitVersion() == null)
+ lockKeys.add(entry.txKey());
}
if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 66653ad,846022c..f345e65
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@@ -93,10 -96,12 +94,10 @@@ public class GridNearTxPrepareRequest e
*/
public GridNearTxPrepareRequest(
IgniteUuid futId,
- long topVer,
- IgniteInternalTx<K, V> tx,
- Collection<IgniteTxEntry<K, V>> reads,
- Collection<IgniteTxEntry<K, V>> writes,
+ AffinityTopologyVersion topVer,
+ IgniteInternalTx tx,
+ Collection<IgniteTxEntry> reads,
+ Collection<IgniteTxEntry> writes,
- IgniteTxKey grpLockKey,
- boolean partLock,
boolean near,
Map<UUID, Collection<UUID>> txNodes,
boolean last,
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index a119288,2456674..ded5409
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@@ -310,8 -359,14 +336,8 @@@ public class GridNearTxPrepareResponse
writer.incrementState();
- case 14:
- if (!writer.writeByteArray("retValBytes", retValBytes))
- case 17:
- if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
+ case 18:
+ if (!writer.writeMessage("retVal", retVal))
return false;
writer.incrementState();
@@@ -380,8 -443,16 +414,8 @@@
reader.incrementState();
- case 14:
- retValBytes = reader.readByteArray("retValBytes");
- case 17:
- pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
+ case 18:
+ retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index fd0105e,9969f65..c3a5b1d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@@ -51,8 -52,11 +52,8 @@@ public class GridNearTxRemote extends G
private GridCacheVersion nearXidVer;
/** Owned versions. */
- private Map<IgniteTxKey<K>, GridCacheVersion> owned;
+ private Map<IgniteTxKey, GridCacheVersion> owned;
- /** Group lock flag. */
- private boolean grpLock;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@@ -92,13 -98,14 +94,27 @@@
TransactionIsolation isolation,
boolean invalidate,
long timeout,
- Collection<IgniteTxEntry<K, V>> writeEntries,
+ Collection<IgniteTxEntry> writeEntries,
int txSize,
- @Nullable IgniteTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) throws IgniteCheckedException {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
- subjId, taskNameHash);
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, grpLockKey, subjId, taskNameHash);
++ super(
++ ctx,
++ nodeId,
++ rmtThreadId,
++ xidVer,
++ commitVer,
++ sys,
++ plc,
++ concurrency,
++ isolation,
++ invalidate,
++ timeout,
++ txSize,
++ subjId,
++ taskNameHash
++ );
assert nearNodeId != null;
@@@ -133,9 -141,10 +150,9 @@@
* @param timeout Timeout.
* @param ctx Cache registry.
* @param txSize Expected transaction size.
- * @param grpLockKey Collection of group lock keys if this is a group-lock transaction.
*/
public GridNearTxRemote(
- GridCacheSharedContext<K, V> ctx,
+ GridCacheSharedContext ctx,
UUID nodeId,
UUID nearNodeId,
GridCacheVersion nearXidVer,
@@@ -151,8 -161,9 +169,22 @@@
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
- subjId, taskNameHash);
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, grpLockKey, subjId, taskNameHash);
++ super(
++ ctx,
++ nodeId,
++ rmtThreadId,
++ xidVer,
++ commitVer,
++ sys,
++ plc,
++ concurrency,
++ isolation,
++ invalidate,
++ timeout,
++ txSize,
++ subjId,
++ taskNameHash
++ );
assert nearNodeId != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index e11a5b2,acd3202..90b5ee8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@@ -227,9 -232,10 +230,9 @@@ public abstract class IgniteTxAdapter e
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Transaction size.
- * @param grpLockKey Group lock key if this is group-lock transaction.
*/
protected IgniteTxAdapter(
- GridCacheSharedContext<K, V> cctx,
+ GridCacheSharedContext<?, ?> cctx,
GridCacheVersion xidVer,
boolean implicit,
boolean implicitSingle,
@@@ -284,9 -293,10 +290,9 @@@
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Transaction size.
- * @param grpLockKey Group lock key if this is group-lock transaction.
*/
protected IgniteTxAdapter(
- GridCacheSharedContext<K, V> cctx,
+ GridCacheSharedContext<?, ?> cctx,
UUID nodeId,
GridCacheVersion xidVer,
GridCacheVersion startVer,
@@@ -360,8 -385,31 +379,8 @@@
}
/** {@inheritDoc} */
- @Override public Collection<IgniteTxEntry<K, V>> optimisticLockEntries() {
+ @Override public Collection<IgniteTxEntry> optimisticLockEntries() {
- if (!groupLock())
- return writeEntries();
- else {
- if (!F.isEmpty(invalidParts)) {
- assert invalidParts.size() == 1 : "Only one partition expected for group lock transaction " +
- "[tx=" + this + ", invalidParts=" + invalidParts + ']';
- assert groupLockEntry() == null : "Group lock key should be rejected " +
- "[tx=" + this + ", groupLockEntry=" + groupLockEntry() + ']';
- assert F.isEmpty(writeMap()) : "All entries should be rejected for group lock transaction " +
- "[tx=" + this + ", writes=" + writeMap() + ']';
-
- return Collections.emptyList();
- }
-
- IgniteTxEntry grpLockEntry = groupLockEntry();
-
- assert grpLockEntry != null || (near() && !local()):
- "Group lock entry was not enlisted into transaction [tx=" + this +
- ", grpLockKey=" + groupLockKey() + ']';
-
- return grpLockEntry == null ?
- Collections.<IgniteTxEntry>emptyList() :
- Collections.singletonList(grpLockEntry);
- }
+ return writeEntries();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 22d8cf9,95d3527..c3e734c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@@ -82,11 -75,9 +75,12 @@@ public class IgniteTxEntry implements G
/** Transform. */
@GridToStringInclude
- private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol;
+ @GridDirectTransient
+ private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol;
+ /** Transient field for calculated entry processor value. */
+ private V entryProcessorCalcVal;
+
/** Transform closure bytes. */
@GridToStringExclude
private byte[] transformClosBytes;
@@@ -131,15 -129,18 +132,15 @@@
private transient boolean locked;
/** Assigned node ID (required only for partitioned cache). */
- private transient UUID nodeId;
+ @GridDirectTransient
+ private UUID nodeId;
/** Flag if this node is a back up node. */
+ @GridDirectTransient
private boolean locMapped;
- /** Deployment enabled flag. */
- private boolean depEnabled;
- /** Group lock entry flag. */
- private boolean grpLock;
--
/** Expiry policy. */
+ @GridDirectTransient
private ExpiryPolicy expiryPlc;
/** Expiry policy transfer flag. */
@@@ -283,7 -296,7 +280,6 @@@
cp.ttl = ttl;
cp.conflictExpireTime = conflictExpireTime;
cp.explicitVer = explicitVer;
- cp.depEnabled = depEnabled;
- cp.grpLock = grpLock;
cp.conflictVer = conflictVer;
cp.expiryPlc = expiryPlc;
@@@ -831,89 -764,87 +747,81 @@@
}
/** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeBoolean(depEnabled);
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
- if (depEnabled) {
- U.writeByteArray(out, keyBytes);
- U.writeByteArray(out, transformClosBytes);
- U.writeByteArray(out, filterBytes);
- }
- else {
- out.writeObject(key);
- U.writeCollection(out, entryProcessorsCol);
- U.writeArray(out, filters);
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
}
- out.writeInt(cacheId);
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("cacheId", cacheId))
+ return false;
- val.writeTo(out);
+ writer.incrementState();
- out.writeLong(ttl);
+ case 1:
+ if (!writer.writeLong("conflictExpireTime", conflictExpireTime))
+ return false;
- CU.writeVersion(out, explicitVer);
+ writer.incrementState();
- if (conflictExpireTime != CU.EXPIRE_TIME_CALCULATE) {
- out.writeBoolean(true);
- out.writeLong(conflictExpireTime);
- }
- else
- out.writeBoolean(false);
+ case 2:
+ if (!writer.writeMessage("conflictVer", conflictVer))
+ return false;
- CU.writeVersion(out, conflictVer);
+ writer.incrementState();
- out.writeObject(transferExpiryPlc ? new IgniteExternalizableExpiryPolicy(expiryPlc) : null);
- }
+ case 3:
+ if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+ return false;
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- depEnabled = in.readBoolean();
-
- if (depEnabled) {
- keyBytes = U.readByteArray(in);
- transformClosBytes = U.readByteArray(in);
- filterBytes = U.readByteArray(in);
- }
- else {
- key = (K)in.readObject();
- entryProcessorsCol = U.readCollection(in);
- filters = GridCacheUtils.readEntryFilterArray(in);
- }
+ writer.incrementState();
- cacheId = in.readInt();
+ case 4:
+ if (!writer.writeMessage("explicitVer", explicitVer))
+ return false;
- val.readFrom(in);
+ writer.incrementState();
- ttl = in.readLong();
+ case 5:
+ if (!writer.writeObjectArray("filters",
+ !F.isEmptyOrNulls(filters) ? filters : null, MessageCollectionItemType.MSG))
+ return false;
- explicitVer = CU.readVersion(in);
+ writer.incrementState();
- conflictExpireTime = in.readBoolean() ? in.readLong() : CU.EXPIRE_TIME_CALCULATE;
- conflictVer = CU.readVersion(in);
- case 6:
- if (!writer.writeBoolean("grpLock", grpLock))
- return false;
-
- writer.incrementState();
-
+ case 7:
+ if (!writer.writeMessage("key", key))
+ return false;
- expiryPlc = (ExpiryPolicy)in.readObject();
- }
+ writer.incrementState();
- /** {@inheritDoc} */
- @Override public Object ggClassId() {
- return GG_CLASS_ID;
- }
+ case 8:
+ if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
+ return false;
- /** {@inheritDoc} */
- @Override public Class<?> deployClass() {
- ClassLoader clsLdr = getClass().getClassLoader();
+ writer.incrementState();
- V val = value();
+ case 9:
+ if (!writer.writeLong("ttl", ttl))
+ return false;
- // First of all check classes that may be loaded by class loader other than application one.
- return key != null && !clsLdr.equals(key.getClass().getClassLoader()) ?
- key.getClass() : val != null ? val.getClass() : getClass();
- }
+ writer.incrementState();
- /** {@inheritDoc} */
- @Override public ClassLoader classLoader() {
- return deployClass().getClassLoader();
+ case 10:
+ if (!writer.writeMessage("val", val))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ca58d6b,da30a94..8e0380c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@@ -409,32 -409,17 +409,32 @@@ public class IgniteTxHandler
assert nodeId != null;
assert res != null;
- GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.xid(),
- res.futureId());
+ if (res.checkCommitted()) {
- GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<IgniteInternalTx>future(
++ GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
+ res.xid(), res.futureId());
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Received response for unknown future (will ignore): " + res);
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received response for unknown future (will ignore): " + res);
- return;
+ return;
+ }
+
+ fut.onResult(nodeId, res);
}
+ else {
- GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteInternalTx>future(
++ GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
+ res.xid(), res.futureId());
- fut.onResult(nodeId, res);
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received response for unknown future (will ignore): " + res);
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
}
/**
@@@ -756,9 -758,9 +756,9 @@@
finish(nodeId, nearTx, req);
if (dhtTx != null && !dhtTx.done()) {
- dhtTx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ dhtTx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
- sendReply(nodeId, req);
+ sendReply(nodeId, req, true);
}
});
}
@@@ -870,17 -873,9 +870,17 @@@
* @param nodeId Node id that originated finish request.
* @param req Request.
*/
- protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req, boolean committed) {
- protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req) {
++ protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
if (req.replyRequired()) {
- GridDhtTxFinishResponse<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
- GridCacheMessage res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
++ GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+
+ if (req.checkCommitted()) {
+ res.checkCommitted(true);
+
+ if (!committed)
+ res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
+ "(transaction has been rolled back on backup node): " + req.version()));
+ }
try {
ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
@@@ -1090,64 -1083,15 +1092,64 @@@
* @param nodeId Node ID.
* @param req Request.
*/
- protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest req) {
+ protected void processCheckPreparedTxRequest(
+ final UUID nodeId,
- final GridCacheOptimisticCheckPreparedTxRequest<K, V> req
++ final GridCacheOptimisticCheckPreparedTxRequest req
+ ) {
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
- boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+ if (req.nearCheck()) {
+ IgniteInternalFuture<Boolean> fut = ctx.tm().nearTxCommitted(req.nearXidVersion());
+
- fut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() {
++ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> f) {
+ try {
+ boolean prepared = f.get();
- GridCacheOptimisticCheckPreparedTxResponse res =
- new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared);
+ sendCheckPrepareTxResponse(nodeId,
+ new GridCacheOptimisticCheckPreparedTxResponse<K, V>(
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ prepared),
+ req.system());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to wait for transaction check prepared future " +
+ "(will send rolled back response): " + req.nearXidVersion(), e);
+
+ sendCheckPrepareTxResponse(nodeId,
+ new GridCacheOptimisticCheckPreparedTxResponse<K, V>(
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false),
+ req.system());
+ }
+ }
+ });
+ }
+ else {
+ boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+
+ sendCheckPrepareTxResponse(nodeId,
- new GridCacheOptimisticCheckPreparedTxResponse<K, V>(req.version(), req.futureId(), req.miniId(), prepared),
++ new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared),
+ req.system());
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response to send.
+ * @param sys System pool flag.
+ */
+ private void sendCheckPrepareTxResponse(
+ UUID nodeId,
- GridCacheOptimisticCheckPreparedTxResponse<K, V> res,
++ GridCacheOptimisticCheckPreparedTxResponse res,
+ boolean sys
+ ) {
try {
if (log.isDebugEnabled())
log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ca18a69,ca85838..55952c8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -108,9 -122,11 +110,9 @@@ public abstract class IgniteTxLocalAdap
* @param isolation Isolation.
* @param timeout Timeout.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
- * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition.
*/
protected IgniteTxLocalAdapter(
- GridCacheSharedContext<K, V> cctx,
+ GridCacheSharedContext cctx,
GridCacheVersion xidVer,
boolean implicit,
boolean implicitSingle,
@@@ -125,8 -141,14 +128,24 @@@
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
- storeEnabled, onePhaseCommit, txSize, subjId, taskNameHash);
- super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, plc, concurrency, isolation, timeout,
- invalidate, storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
-
- assert !partLock || grpLockKey != null;
-
- this.partLock = partLock;
++ super(
++ cctx,
++ xidVer,
++ implicit,
++ implicitSingle,
++ /*local*/true,
++ sys,
++ plc,
++ concurrency,
++ isolation,
++ timeout,
++ invalidate,
++ storeEnabled,
++ onePhaseCommit,
++ txSize,
++ subjId,
++ taskNameHash
++ );
minVer = xidVer;
}
@@@ -456,16 -491,16 +483,16 @@@
* @throws IgniteCheckedException If batch update failed.
*/
@SuppressWarnings({"CatchGenericClass"})
- protected void batchStoreCommit(Iterable<IgniteTxEntry<K, V>> writeEntries) throws IgniteCheckedException {
- GridCacheStoreManager<K, V> store = store();
+ protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
+ GridCacheStoreManager store = store();
if (store != null && store.writeThrough() && storeEnabled() &&
- (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
+ !internal() && (near() || store.writeToStoreFromDht())) {
try {
if (writeEntries != null) {
- Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
- List<K> rmvCol = null;
- GridCacheStoreManager<K, V> writeStore = null;
+ Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null;
+ List<Object> rmvCol = null;
+ GridCacheStoreManager writeStore = null;
boolean skipNear = near() && store.writeToStoreFromDht();
@@@ -951,9 -981,9 +970,9 @@@
}
}
else {
- GridCacheStoreManager<K, V> store = store();
+ GridCacheStoreManager store = store();
- if (store != null && (!internal() || groupLock())) {
+ if (store != null && !internal()) {
try {
store.txEnd(this, true);
}
@@@ -1013,10 -1086,10 +1032,10 @@@
cctx.tm().rollbackTx(this);
- GridCacheStoreManager<K, V> store = store();
+ GridCacheStoreManager store = store();
if (store != null && (near() || store.writeToStoreFromDht())) {
- if (!internal() || groupLock())
+ if (!internal())
store.txEnd(this, false);
}
}
@@@ -1062,11 -1137,13 +1083,11 @@@
cacheCtx.checkSecurity(GridSecurityPermission.CACHE_READ);
- groupLockSanityCheck(cacheCtx, keys);
-
boolean single = keysCnt == 1;
- Collection<K> lockKeys = null;
+ Collection<KeyCacheObject> lockKeys = null;
- long topVer = topologyVersion();
+ AffinityTopologyVersion topVer = topologyVersion();
// In this loop we cover only read-committed or optimistic transactions.
// Transactions that are pessimistic and not read-committed are covered
@@@ -1092,17 -1166,11 +1110,11 @@@
if (!F.isEmpty(txEntry.entryProcessors()))
val = txEntry.applyEntryProcessors(val);
- if (val != null) {
- V val0 = val;
-
- if (cacheCtx.portableEnabled())
- val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
-
- map.put(key, (V)CU.skipValue(val0, skipVals));
- }
+ if (val != null)
+ cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, false);
}
else {
- assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
+ assert txEntry.op() == TRANSFORM;
while (true) {
try {
@@@ -1177,9 -1246,9 +1190,9 @@@
try {
GridCacheVersion ver = entry.version();
- V val = null;
+ CacheObject val = null;
- if (!pessimistic() || readCommitted() || groupLock() && !skipVals) {
+ if ((!pessimistic() || readCommitted()) && !skipVals) {
IgniteCacheExpiryPolicy accessPlc =
optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
@@@ -1336,159 -1411,182 +1352,181 @@@
if (log.isDebugEnabled())
log.debug("Loading missed values for missed map: " + missedMap);
- final Collection<K> loaded = new HashSet<>();
-
- return new GridEmbeddedFuture<>(cctx.kernalContext(),
- loadMissing(
- cacheCtx,
- true, false, missedMap.keySet(), deserializePortable, skipVals, new CI2<K, V>() {
- /** */
- private GridCacheVersion nextVer;
+ final Collection<KeyCacheObject> loaded = new HashSet<>();
- @Override public void apply(K key, V val) {
- if (isRollbackOnly()) {
- if (log.isDebugEnabled())
- log.debug("Ignoring loaded value for read because transaction was rolled back: " +
- IgniteTxLocalAdapter.this);
+ return new GridEmbeddedFuture<>(
+ new C2<Boolean, Exception, Map<K, V>>() {
+ @Override public Map<K, V> apply(Boolean b, Exception e) {
+ if (e != null) {
+ setRollbackOnly();
- return;
+ throw new GridClosureException(e);
}
- GridCacheVersion ver = missedMap.get(key);
-
- if (ver == null) {
- if (log.isDebugEnabled())
- log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
+ if (!b && !readCommitted()) {
+ // There is no store - we must mark the entries.
+ for (KeyCacheObject key : missedMap.keySet()) {
+ IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
- return;
+ if (txEntry != null)
+ txEntry.markValid();
+ }
}
- V visibleVal = val;
+ if (readCommitted()) {
+ Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet());
- IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+ notFound.removeAll(loaded);
- IgniteTxEntry<K, V> txEntry = entry(txKey);
+ // In read-committed mode touch entries that have just been read.
+ for (KeyCacheObject key : notFound) {
+ IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
- if (txEntry != null) {
- if (!readCommitted())
- txEntry.readValue(val);
+ GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
+ txEntry.cached();
- if (!F.isEmpty(txEntry.entryProcessors()))
- visibleVal = txEntry.applyEntryProcessors(visibleVal);
+ if (entry != null)
+ cacheCtx.evicts().touch(entry, topologyVersion());
+ }
}
- // In pessimistic mode we hold the lock, so filter validation
- // should always be valid.
- if (pessimistic())
- ver = null;
+ return map;
+ }
+ },
+ loadMissing(
+ cacheCtx,
+ true,
+ false,
+ missedMap.keySet(),
+ deserializePortable,
+ skipVals,
+ new CI2<KeyCacheObject, Object>() {
+ /** */
+ private GridCacheVersion nextVer;
- // Initialize next version.
- if (nextVer == null)
- nextVer = cctx.versions().next(topologyVersion());
+ @Override public void apply(KeyCacheObject key, Object val) {
+ if (isRollbackOnly()) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring loaded value for read because transaction was rolled back: " +
+ IgniteTxLocalAdapter.this);
- while (true) {
- assert txEntry != null || readCommitted() || skipVals;
+ return;
+ }
- GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+ GridCacheVersion ver = missedMap.get(key);
- try {
- // Must initialize to true since even if filter didn't pass,
- // we still record the transaction value.
- boolean set;
+ if (ver == null) {
+ if (log.isDebugEnabled())
+ log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
- try {
- set = e.versionedValue(val, ver, nextVer);
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction getAll method " +
- "(will try again): " + e);
+ return;
+ }
- if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
- U.error(log, "Inconsistent transaction state (entry got removed while " +
- "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
- setRollbackOnly();
+ CacheObject visibleVal = cacheVal;
- return;
- }
+ IgniteTxKey txKey = cacheCtx.txKey(key);
- if (txEntry != null)
- txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+ IgniteTxEntry txEntry = entry(txKey);
- continue; // While loop.
- }
+ if (txEntry != null) {
+ if (!readCommitted())
+ txEntry.readValue(cacheVal);
- // In pessimistic mode, we should always be able to set.
- assert set || !pessimistic();
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ visibleVal = txEntry.applyEntryProcessors(visibleVal);
+ }
- if (readCommitted() || skipVals) {
- cacheCtx.evicts().touch(e, topologyVersion());
+ // In pessimistic mode we hold the lock, so filter validation
+ // should always be valid.
+ if (pessimistic())
+ ver = null;
- if (visibleVal != null)
- map.put(key, (V)CU.skipValue(visibleVal, skipVals));
- }
- else {
- assert txEntry != null;
+ // Initialize next version.
+ if (nextVer == null)
+ nextVer = cctx.versions().next(topologyVersion());
- txEntry.setAndMarkValid(val);
+ while (true) {
- assert txEntry != null || readCommitted() || groupLock() || skipVals;
++ assert txEntry != null || readCommitted() || skipVals;
- if (visibleVal != null)
- map.put(key, visibleVal);
- }
+ GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
- loaded.add(key);
+ try {
+ // Must initialize to true since even if filter didn't pass,
+ // we still record the transaction value.
+ boolean set;
- if (log.isDebugEnabled())
- log.debug("Set value loaded from store into entry from transaction [set=" + set +
- ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+ try {
+ set = e.versionedValue(cacheVal, ver, nextVer);
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction getAll method " +
+ "(will try again): " + e);
- break; // While loop.
- }
- catch (IgniteCheckedException ex) {
- throw new IgniteException("Failed to put value for cache entry: " + e, ex);
- }
- }
- }
- }),
- new C2<Boolean, Exception, Map<K, V>>() {
- @Override public Map<K, V> apply(Boolean b, Exception e) {
- if (e != null) {
- setRollbackOnly();
- if (pessimistic() && !readCommitted() && !isRollbackOnly() &&
- (!groupLock() || F.eq(e.key(), groupLockKey()))) {
++ if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
+ U.error(log, "Inconsistent transaction state (entry got removed while " +
+ "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
- throw new GridClosureException(e);
- }
+ setRollbackOnly();
- if (!b && !readCommitted()) {
- // There is no store - we must mark the entries.
- for (K key : missedMap.keySet()) {
- IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+ return;
+ }
- if (txEntry != null)
- txEntry.markValid();
- }
- }
+ if (txEntry != null)
+ txEntry.cached(entryEx(cacheCtx, txKey));
- if (readCommitted()) {
- Collection<K> notFound = new HashSet<>(missedMap.keySet());
+ continue; // While loop.
+ }
- notFound.removeAll(loaded);
+ // In pessimistic mode, we should always be able to set.
+ assert set || !pessimistic();
- // In read-committed mode touch entries that have just been read.
- for (K key : notFound) {
- IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
- if (readCommitted() || groupLock() || skipVals) {
++ if (readCommitted() || skipVals) {
+ cacheCtx.evicts().touch(e, topologyVersion());
- GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
- txEntry.cached();
+ if (visibleVal != null) {
+ cacheCtx.addResult(map,
+ key,
+ visibleVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
+ }
+ else {
+ assert txEntry != null;
+
+ txEntry.setAndMarkValid(cacheVal);
+
+ if (visibleVal != null) {
+ cacheCtx.addResult(map,
+ key,
+ visibleVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
+ }
- if (entry != null)
- cacheCtx.evicts().touch(entry, topologyVersion());
+ loaded.add(key);
+
+ if (log.isDebugEnabled())
+ log.debug("Set value loaded from store into entry from transaction [set=" + set +
+ ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+
+ break; // While loop.
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+ }
}
}
-
- return map;
- }
- });
+ })
+ );
}
/** {@inheritDoc} */
@@@ -1526,13 -1625,14 +1565,14 @@@
missed,
keysCnt,
deserializePortable,
- skipVals);
+ skipVals,
+ keepCacheObjects);
if (single && missed.isEmpty())
- return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+ return new GridFinishedFuture<>(retMap);
// Handle locks.
- if (pessimistic() && !readCommitted() && !groupLock() && !skipVals) {
+ if (pessimistic() && !readCommitted() && !skipVals) {
if (expiryPlc == null)
expiryPlc = cacheCtx.expiry();
@@@ -1677,9 -1789,9 +1729,9 @@@
}
}
else {
- assert optimistic() || readCommitted() || groupLock() || skipVals;
+ assert optimistic() || readCommitted() || skipVals;
- final Collection<K> redos = new ArrayList<>();
+ final Collection<KeyCacheObject> redos = new ArrayList<>();
if (!missed.isEmpty()) {
if (!readCommitted())
@@@ -1872,7 -2002,9 +1942,7 @@@
if (invokeMap != null)
transform = true;
- for (K key : keys) {
- groupLockSanityCheck(cacheCtx, keys);
-
+ for (Object key : keys) {
if (key == null) {
setRollbackOnly();
@@@ -2027,9 -2158,12 +2096,9 @@@
if (!implicit() && readCommitted())
cacheCtx.evicts().touch(entry, topologyVersion());
- enlisted.add(key);
- if (groupLock() && !lockOnly)
- txEntry.groupLockEntry(true);
-
+ enlisted.add(cacheKey);
- if ((!pessimistic() && !implicit()) || (groupLock() && !lockOnly)) {
+ if ((!pessimistic() && !implicit())) {
txEntry.markValid();
if (old == null) {
@@@ -2496,22 -2602,11 +2537,11 @@@
drMap,
null);
- if (pessimistic() && !groupLock()) {
+ if (pessimistic()) {
// Loose all skipped.
- final Set<K> loaded = loadFut.get();
-
- final Collection<K> keys;
+ final Set<KeyCacheObject> loaded = loadFut.get();
- if (keySet != null ) {
- keys = new ArrayList<>(keySet.size());
-
- for (K k : keySet) {
- if (k != null && (loaded == null || !loaded.contains(k)))
- keys.add(k);
- }
- }
- else
- keys = Collections.emptyList();
+ final Collection<KeyCacheObject> keys = F.view(enlisted, F0.notIn(loaded));
if (log.isDebugEnabled())
log.debug("Before acquiring transaction lock for put on keys: " + keys);
@@@ -2733,9 -2824,9 +2759,9 @@@
// Acquire locks only after having added operation to the write set.
// Otherwise, during rollback we will not know whether locks need
// to be rolled back.
- if (pessimistic() && !groupLock()) {
+ if (pessimistic()) {
// Loose all skipped.
- final Collection<? extends K> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
+ final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
if (log.isDebugEnabled())
log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys);
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index e372095,9f1a70e..e06cde0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@@ -131,13 -138,27 +133,13 @@@ public interface IgniteTxLocalEx extend
* @return Future for asynchronous remove.
*/
public IgniteInternalFuture<?> removeAllDrAsync(
- GridCacheContext<K, V> cacheCtx,
- Map<? extends K, GridCacheVersion> drMap);
+ GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheVersion> drMap);
/**
- * Performs keys locking for affinity-based group lock transactions.
- *
- * @param cacheCtx Cache context.
- * @param keys Keys to lock.
- * @return Lock future.
- */
- public <K> IgniteInternalFuture<?> groupLockAsync(GridCacheContext cacheCtx, Collection<K> keys);
-
- /**
- * @return {@code True} if keys from the same partition are allowed to be enlisted in group-lock transaction.
- */
- public boolean partitionLock();
-
- /**
* @return Return value for
*/
- public GridCacheReturn<V> implicitSingleResult();
+ public GridCacheReturn implicitSingleResult();
/**
* Finishes transaction (either commit or rollback).
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 974144a,7ea6e3a..14e1db2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@@ -200,9 -205,10 +205,9 @@@ public class IgniteTxManager extends Gr
}
if (tx instanceof IgniteTxRemoteEx) {
- IgniteTxRemoteEx<K, V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+ IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
- rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList());
+ rmtTx.doneRemote(tx.xidVersion());
}
tx.commit();
@@@ -334,8 -340,8 +339,8 @@@
* @return {@code True} if transaction has been committed or rolled back,
* {@code false} otherwise.
*/
- public boolean isCompleted(IgniteInternalTx<K, V> tx) {
+ public boolean isCompleted(IgniteInternalTx tx) {
- return completedVers.containsKey(tx.xidVersion());
+ return completedVers.containsKey(tx.writeVersion());
}
/**
@@@ -345,19 -351,24 +350,21 @@@
* @param isolation Isolation.
* @param timeout transaction timeout.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
- * @param partLock {@code True} if partition is locked.
* @return New transaction.
*/
- public IgniteTxLocalAdapter<K, V> newTx(
+ public IgniteTxLocalAdapter newTx(
boolean implicit,
boolean implicitSingle,
- boolean sys,
+ @Nullable GridCacheContext sysCacheCtx,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
boolean invalidate,
boolean storeEnabled,
- int txSize,
- @Nullable IgniteTxKey grpLockKey,
- boolean partLock) {
+ int txSize
+ ) {
+ assert sysCacheCtx == null || sysCacheCtx.system();
-
++
UUID subjId = null; // TODO GG-9141 how to get subj ID?
int taskNameHash = cctx.kernalContext().job().currentTaskNameHash();
@@@ -936,18 -969,44 +963,18 @@@
}
/**
- * Gets committed transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive?
- *
- * @param min Start (or minimum) version.
- * @return Committed transactions starting from the given version (non-inclusive).
- */
- public Collection<GridCacheVersion> committedVersions(GridCacheVersion min) {
- ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
- = completedVers.tailMap(min, true);
-
- return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, true);
- }
-
- /**
- * Gets rolledback transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive?
- *
- * @param min Start (or minimum) version.
- * @return Committed transactions starting from the given version (non-inclusive).
- */
- public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion min) {
- ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
- = completedVers.tailMap(min, true);
-
- return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, false);
- }
-
- /**
* @param tx Tx to remove.
*/
- public void removeCommittedTx(IgniteInternalTx<K, V> tx) {
+ public void removeCommittedTx(IgniteInternalTx tx) {
- completedVers.remove(tx.xidVersion(), true);
+ completedVers.remove(tx.writeVersion(), true);
}
/**
* @param tx Committed transaction.
* @return If transaction was not already present in committed set.
*/
- public boolean addCommittedTx(IgniteInternalTx<K, V> tx) {
+ public boolean addCommittedTx(IgniteInternalTx tx) {
- return addCommittedTx(tx.xidVersion(), tx.nearXidVersion());
+ return addCommittedTx(tx.writeVersion(), tx.nearXidVersion());
}
/**
@@@ -1069,9 -1198,18 +1096,9 @@@
completedVers.firstKey() + ", lastVer=" + completedVers.lastKey() + ", tx=" + tx.xid() + ']');
}
- ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> txIdMap = transactionMap(tx);
+ ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
if (txIdMap.remove(tx.xidVersion(), tx)) {
- // 2. Must process completed entries before unlocking!
- processCompletedEntries(tx);
-
- if (tx instanceof GridDhtTxLocal) {
- GridDhtTxLocal dhtTxLoc = (GridDhtTxLocal)tx;
-
- collectPendingVersions(dhtTxLoc);
- }
-
// 3.1 Call dataStructures manager.
cctx.kernalContext().dataStructures().onTxCommitted(tx);
@@@ -1277,11 -1445,11 +1331,11 @@@
/**
* @param tx Transaction to notify evictions for.
*/
- private void notifyEvitions(IgniteInternalTx<K, V> tx) {
+ private void notifyEvitions(IgniteInternalTx tx) {
- if (tx.internal() && !tx.groupLock())
+ if (tx.internal())
return;
- for (IgniteTxEntry<K, V> txEntry : tx.allEntries())
+ for (IgniteTxEntry txEntry : tx.allEntries())
txEntry.cached().context().evicts().touch(txEntry, tx.local());
}
@@@ -1721,9 -1911,10 +1775,9 @@@
}
if (tx instanceof GridDistributedTxRemoteAdapter) {
- IgniteTxRemoteEx<K,V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+ IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
- rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList());
+ rmtTx.doneRemote(tx.xidVersion());
}
if (commit)
@@@ -1747,9 -1938,12 +1801,9 @@@
}
if (tx instanceof GridDistributedTxRemoteAdapter) {
- IgniteTxRemoteEx<K,V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+ IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
- rmtTx.doneRemote(tx.xidVersion(),
- Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList());
+ rmtTx.doneRemote(tx.xidVersion());
}
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------