You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/17 15:15:02 UTC
ignite git commit: ignite-1561
Repository: ignite
Updated Branches:
refs/heads/ignite-1561-1 045a1b29f -> 84df73e30
ignite-1561
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84df73e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84df73e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84df73e3
Branch: refs/heads/ignite-1561-1
Commit: 84df73e3080e58fd8f79eb0343ca2453b621c34f
Parents: 045a1b2
Author: sboikov <sb...@gridgain.com>
Authored: Mon Apr 17 13:59:18 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Apr 17 18:09:28 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheUtils.java | 60 +++++-
.../distributed/GridDistributedTxMapping.java | 66 +++++-
...arOptimisticSerializableTxPrepareFuture.java | 213 +++++++++++++------
.../near/GridNearOptimisticTxPrepareFuture.java | 110 ++++++----
.../GridNearPessimisticTxPrepareFuture.java | 165 +++++++++-----
.../near/GridNearTxPrepareFutureAdapter.java | 31 +--
.../cache/transactions/IgniteTxHandler.java | 48 +----
.../lang/gridfunc/TransformCollectionView.java | 5 -
.../dht/IgniteCrossCacheTxSelfTest.java | 12 +-
9 files changed, 473 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d20a782..9ccc338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -247,9 +247,9 @@ public class GridCacheUtils {
private static final CacheEntryPredicate[] ALWAYS_FALSE0_ARR = new CacheEntryPredicate[] {ALWAYS_FALSE0};
/** Read filter. */
- private static final IgnitePredicate READ_FILTER = new P1<Object>() {
- @Override public boolean apply(Object e) {
- return ((IgniteTxEntry)e).op() == READ;
+ public static final IgnitePredicate READ_FILTER = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() == READ;
}
@Override public String toString() {
@@ -257,10 +257,54 @@ public class GridCacheUtils {
}
};
+ /** Read filter. */
+ public static final IgnitePredicate READ_FILTER_NEAR = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() == READ && e.context().isNear();
+ }
+
+ @Override public String toString() {
+ return "Cache transaction read filter";
+ }
+ };
+
+ /** Read filter. */
+ public static final IgnitePredicate READ_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() == READ && !e.context().isNear();
+ }
+
+ @Override public String toString() {
+ return "Cache transaction read filter";
+ }
+ };
+
+ /** Write filter. */
+ public static final IgnitePredicate WRITE_FILTER = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() != READ;
+ }
+
+ @Override public String toString() {
+ return "Cache transaction write filter";
+ }
+ };
+
+ /** Write filter. */
+ public static final IgnitePredicate WRITE_FILTER_NEAR = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() != READ && e.context().isNear();
+ }
+
+ @Override public String toString() {
+ return "Cache transaction write filter";
+ }
+ };
+
/** Write filter. */
- private static final IgnitePredicate WRITE_FILTER = new P1<Object>() {
- @Override public boolean apply(Object e) {
- return ((IgniteTxEntry)e).op() != READ;
+ public static final IgnitePredicate WRITE_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() != READ && !e.context().isNear();
}
@Override public String toString() {
@@ -613,7 +657,7 @@ public class GridCacheUtils {
* @return Filter for transaction reads.
*/
@SuppressWarnings({"unchecked"})
- public static <K, V> IgnitePredicate<IgniteTxEntry> reads() {
+ public static IgnitePredicate<IgniteTxEntry> reads() {
return READ_FILTER;
}
@@ -621,7 +665,7 @@ public class GridCacheUtils {
* @return Filter for transaction writes.
*/
@SuppressWarnings({"unchecked"})
- public static <K, V> IgnitePredicate<IgniteTxEntry> writes() {
+ public static IgnitePredicate<IgniteTxEntry> writes() {
return WRITE_FILTER;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 9d86244..a15c00a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -58,7 +58,7 @@ public class GridDistributedTxMapping {
private boolean clientFirst;
/** */
- private boolean hasNear;
+ private int nearEntries;
/**
* @param primary Primary node.
@@ -97,8 +97,18 @@ public class GridDistributedTxMapping {
this.clientFirst = clientFirst;
}
+ /**
+ * @return {@code True} if has colocated cache entries.
+ */
+ public boolean hasColocatedCacheEntries() {
+ return entries.size() > nearEntries;
+ }
+
+ /**
+ * @return {@code True} if has near cache entries.
+ */
public boolean hasNearCacheEntries() {
- return hasNear;
+ return nearEntries > 0;
}
/**
@@ -119,7 +129,9 @@ public class GridDistributedTxMapping {
* @return Near cache entries.
*/
@Nullable public List<IgniteTxEntry> nearCacheEntries() {
- assert hasNear;
+ assert nearEntries > 0;
+
+ // TODO IGNITE-1561.
List<IgniteTxEntry> nearCacheEntries = new ArrayList<>();
@@ -167,24 +179,62 @@ public class GridDistributedTxMapping {
* @return Reads.
*/
public Collection<IgniteTxEntry> reads() {
- return F.view(entries, CU.reads());
+ return F.view(entries, CU.READ_FILTER);
}
/**
* @return Writes.
*/
public Collection<IgniteTxEntry> writes() {
- return F.view(entries, CU.writes());
+ return F.view(entries, CU.WRITE_FILTER);
+ }
+
+ /**
+ * @return Near cache reads.
+ */
+ public Collection<IgniteTxEntry> nearEntriesReads() {
+ assert hasNearCacheEntries();
+
+ return F.view(entries, CU.READ_FILTER_NEAR);
+ }
+
+ /**
+ * @return Near cache writes.
+ */
+ public Collection<IgniteTxEntry> nearEntriesWrites() {
+ assert hasNearCacheEntries();
+
+ return F.view(entries, CU.WRITE_FILTER_NEAR);
+ }
+
+ /**
+ * @return Colocated cache reads.
+ */
+ public Collection<IgniteTxEntry> colocatedEntriesReads() {
+ assert hasColocatedCacheEntries();
+
+ return F.view(entries, CU.READ_FILTER_COLOCATED);
+ }
+
+ /**
+ * @return Colocated cache writes.
+ */
+ public Collection<IgniteTxEntry> colocatedEntriesWrites() {
+ assert hasColocatedCacheEntries();
+
+ return F.view(entries, CU.WRITE_FILTER_COLOCATED);
}
/**
* @param entry Adds entry.
*/
public void add(IgniteTxEntry entry) {
- if (entry.context().isNear())
- hasNear = true;
+ boolean add = entries.add(entry);
+
+ assert add : entry;
- entries.add(entry);
+ if (entry.context().isNear())
+ nearEntries++;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index a54c65d..1212155 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -199,7 +200,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture mini = miniFuture(res.miniId());
if (mini != null)
- mini.onResult(res);
+ mini.onResult(res, true);
}
}
@@ -339,14 +340,24 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
GridDhtTxMapping txMapping = new GridDhtTxMapping();
- Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+ Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
- for (IgniteTxEntry write : writes)
+ boolean nearEntries = false;
+
+ for (IgniteTxEntry write : writes) {
map(write, topVer, mappings, txMapping, remap, topLocked);
- for (IgniteTxEntry read : reads)
+ if (write.context().isNear())
+ nearEntries = true;
+ }
+
+ for (IgniteTxEntry read : reads) {
map(read, topVer, mappings, txMapping, remap, topLocked);
+ if (read.context().isNear())
+ nearEntries = true;
+ }
+
if (keyLockFut != null)
keyLockFut.onAllKeysAdded();
@@ -363,12 +374,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase(txMapping);
+ if (!nearEntries)
+ checkOnePhase(txMapping);
+
+ MiniFuture locNearOnlyFut = null;
+ // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}.
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
- add(new MiniFuture(this, m, ++miniId));
+ MiniFuture fut = new MiniFuture(this, m, ++miniId);
+
+ add(fut);
+
+ if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
+ assert locNearOnlyFut == null;
+
+ locNearOnlyFut = fut;
+
+ add(new MiniFuture(this, m, ++miniId));
+ }
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -383,7 +408,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture fut = (MiniFuture)fut0;
- IgniteCheckedException err = prepare(fut, txMapping);
+ IgniteCheckedException err = prepare(fut, txMapping, locNearOnlyFut);
if (err != null) {
while (it.hasNext()) {
@@ -419,7 +444,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param fut Mini future.
* @return Prepare error if any.
*/
- @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) {
+ @Nullable private IgniteCheckedException prepare(final MiniFuture fut,
+ GridDhtTxMapping txMapping,
+ @Nullable MiniFuture locNearOnlyFut) {
GridDistributedTxMapping m = fut.mapping();
final ClusterNode primary = m.primary();
@@ -434,36 +461,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
return err;
}
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- timeout,
- m.reads(),
- m.writes(),
- m.hasNearCacheEntries(),
- txMapping.transactionNodes(),
- m.last(),
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash(),
- m.clientFirst(),
- tx.activeCachesDeploymentEnabled());
-
- for (IgniteTxEntry txEntry : m.entries()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
- }
-
// Must lock near entries separately.
if (m.hasNearCacheEntries()) {
try {
tx.optimisticLockEntries(m.nearCacheEntries());
- tx.userPrepare();
+ cctx.tm().prepareTx(tx);
}
catch (IgniteCheckedException e) {
fut.onResult(e);
@@ -472,27 +475,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
}
- req.miniId(fut.futureId());
-
// If this is the primary node for the keys.
if (primary.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
- tx,
- req);
-
- prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
- try {
- fut.onResult(prepFut.get());
- }
- catch (IgniteCheckedException e) {
- fut.onResult(e);
- }
- }
- });
+ if (locNearOnlyFut != null) {
+ boolean nearOnly = fut == locNearOnlyFut;
+
+ GridNearTxPrepareRequest req = createRequest(txMapping.transactionNodes(),
+ fut,
+ timeout,
+ nearOnly ? m.nearEntriesReads() : m.colocatedEntriesReads(),
+ nearOnly ? m.nearEntriesWrites() : m.colocatedEntriesWrites());
+
+ prepareLocal(req, fut, nearOnly, nearOnly);
+ }
+ else {
+ GridNearTxPrepareRequest req = createRequest(txMapping.transactionNodes(),
+ fut,
+ timeout,
+ m.reads(),
+ m.writes());
+
+ prepareLocal(req, fut, m.hasNearCacheEntries(), true);
+ }
}
else {
try {
+ GridNearTxPrepareRequest req = createRequest(txMapping.transactionNodes(),
+ fut,
+ timeout,
+ m.reads(),
+ m.writes());
+
cctx.io().send(primary, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException e) {
@@ -513,6 +526,79 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/**
+ * @param txNodes
+ * @param fut
+ * @param timeout
+ * @param reads
+ * @param writes
+ * @return Request.
+ */
+ private GridNearTxPrepareRequest createRequest(
+ Map<UUID, Collection<UUID>> txNodes,
+ MiniFuture fut,
+ long timeout,
+ Collection<IgniteTxEntry> reads,
+ Collection<IgniteTxEntry> writes) {
+ GridDistributedTxMapping m = fut.mapping();
+
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ reads,
+ writes,
+ m.hasNearCacheEntries(),
+ txNodes,
+ m.last(),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ m.clientFirst(),
+ tx.activeCachesDeploymentEnabled());
+
+ for (IgniteTxEntry txEntry : m.entries()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ req.miniId(fut.futureId());
+
+ return req;
+ }
+
+ /**
+ * @param req Request.
+ * @param nearTx Near cache mapping flag.
+ * @param updateMapping Update mapping flag.
+ */
+ private void prepareLocal(GridNearTxPrepareRequest req,
+ final MiniFuture fut,
+ final boolean nearTx,
+ final boolean updateMapping) {
+ if (nearTx)
+ req.cloneEntries();
+
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearTx ?
+ cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+ cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get(), updateMapping);
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+ }
+ }
+ });
+ }
+
+ /**
* @param entry Transaction entry.
* @param topVer Topology version.
* @param curMapping Current mapping.
@@ -522,7 +608,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
private void map(
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
- Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+ Map<UUID, GridDistributedTxMapping> curMapping,
GridDhtTxMapping txMapping,
boolean remap,
boolean topLocked
@@ -565,27 +651,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
}
- IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
-
- GridDistributedTxMapping cur = curMapping.get(key);
+ GridDistributedTxMapping cur = curMapping.get(primary.id());
if (cur == null) {
cur = new GridDistributedTxMapping(primary);
- curMapping.put(key, cur);
-
- if (primary.isLocal()) {
- if (entry.context().isNear())
- tx.nearLocallyMapped(true);
- else if (entry.context().isColocated())
- tx.colocatedLocallyMapped(true);
- }
+ curMapping.put(primary.id(), cur);
cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
cur.last(true);
}
+ if (primary.isLocal()) {
+ if (cacheCtx.isNear())
+ tx.nearLocallyMapped(true);
+ else if (cacheCtx.isColocated())
+ tx.colocatedLocallyMapped(true);
+ }
+
cur.add(entry);
if (entry.explicitVersion() != null) {
@@ -772,7 +856,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param res Result callback.
*/
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- void onResult(final GridNearTxPrepareResponse res) {
+ void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) {
if (isDone())
return;
@@ -875,7 +959,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
onDone(res);
}
else {
- parent.onPrepareResponse(m, res);
+ parent.onPrepareResponse(m, res, updateMapping);
// Finish this mini future (need result only on client node).
onDone(parent.cctx.kernalContext().clientNode() ? res : null);
@@ -889,8 +973,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
*/
private void remap(final GridNearTxPrepareResponse res) {
parent.prepareOnTopology(true, new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
onDone(res);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 26af91a..fc4d8c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -180,7 +180,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (mini != null) {
assert mini.node().id().equals(nodeId);
- mini.onResult(res);
+ mini.onResult(res, true);
}
else {
if (msgLog.isDebugEnabled()) {
@@ -477,6 +477,60 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
proceedPrepare(m, mappings);
}
+ private void prepareLocal(GridNearTxPrepareRequest req,
+ final MiniFuture fut,
+ boolean nearTx,
+ final boolean updateMapping) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearTx ?
+ cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+ cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get(), updateMapping);
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+ }
+ }
+ });
+ }
+
+ private GridNearTxPrepareRequest createRequest(long timeout,
+ MiniFuture fut,
+ Collection<IgniteTxEntry> writes) {
+ GridDistributedTxMapping m = fut.mapping();
+
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ null,
+ writes,
+ m.hasNearCacheEntries(),
+ txMapping.transactionNodes(),
+ m.last(),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ m.clientFirst(),
+ tx.activeCachesDeploymentEnabled());
+
+ for (IgniteTxEntry txEntry : m.entries()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ req.miniId(fut.futureId());
+
+ return req;
+ }
+
/**
* Continues prepare after previous mapping successfully finished.
*
@@ -497,36 +551,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
long timeout = tx.remainingTime();
if (timeout != -1) {
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- timeout,
- null,
- m.writes(),
- m.hasNearCacheEntries(),
- txMapping.transactionNodes(),
- m.last(),
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash(),
- m.clientFirst(),
- tx.activeCachesDeploymentEnabled());
-
- for (IgniteTxEntry txEntry : m.entries()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
- }
-
// Must lock near entries separately.
if (m.hasNearCacheEntries()) {
try {
tx.optimisticLockEntries(m.nearCacheEntries());
- tx.userPrepare();
+ cctx.tm().prepareTx(tx);
}
catch (IgniteCheckedException e) {
onError(e, false);
@@ -535,31 +565,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
final MiniFuture fut = new MiniFuture(this, m, ++miniId, mappings);
- req.miniId(fut.futureId());
-
add(fut); // Append new future.
- // If this is the primary node for the keys.
if (n.isLocal()) {
- // At this point, if any new node joined, then it is
- // waiting for this transaction to complete, so
- // partition reassignments are not possible here.
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut =
- cctx.tm().txHandler().prepareTx(n.id(), tx, req);
-
- prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
- try {
- fut.onResult(prepFut.get());
- }
- catch (IgniteCheckedException e) {
- fut.onResult(e);
- }
- }
- });
+ GridNearTxPrepareRequest req = createRequest(timeout, fut, m.writes());
+
+ prepareLocal(req, fut, m.hasNearCacheEntries(), true);
}
else {
try {
+ GridNearTxPrepareRequest req = createRequest(timeout, fut, m.writes());
+
cctx.io().send(n, req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
@@ -870,7 +886,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param res Result callback.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- void onResult(final GridNearTxPrepareResponse res) {
+ void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) {
if (isDone())
return;
@@ -912,7 +928,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
remap();
}
else {
- parent.onPrepareResponse(m, res);
+ parent.onPrepareResponse(m, res, updateMapping);
// Proceed prepare before finishing mini future.
if (mappings != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index f6e7d2a..ee5790f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -102,7 +101,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (f != null) {
assert f.primary().id().equals(nodeId);
- f.onResult(res);
+ f.onResult(res, true);
}
else {
if (msgLog.isDebugEnabled()) {
@@ -181,6 +180,80 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
/**
+ * @param txNodes Tx nodes.
+ * @param m Mapping.
+ * @param timeout Timeout.
+ * @param reads Reads.
+ * @param writes Writes.
+ * @return Request.
+ */
+ private GridNearTxPrepareRequest createRequest(Map<UUID, Collection<UUID>> txNodes,
+ GridDistributedTxMapping m,
+ long timeout,
+ Collection<IgniteTxEntry> reads,
+ Collection<IgniteTxEntry> writes) {
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ reads,
+ writes,
+ m.hasNearCacheEntries(),
+ txNodes,
+ true,
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ false,
+ tx.activeCachesDeploymentEnabled());
+
+ for (IgniteTxEntry txEntry : m.entries()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ return req;
+ }
+
+ /**
+ * @param req Request.
+ * @param m Mapping.
+ * @param miniId Mini future ID.
+ * @param nearTx Near cache mapping flag.
+ * @param updateMapping Update mapping flag.
+ */
+ private void prepareLocal(GridNearTxPrepareRequest req,
+ GridDistributedTxMapping m,
+ int miniId,
+ final boolean nearTx,
+ final boolean updateMapping) {
+ final MiniFuture fut = new MiniFuture(m, miniId);
+
+ req.miniId(fut.futureId());
+
+ add(fut);
+
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearTx ?
+ cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+ cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get(), updateMapping);
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+ }
+ });
+ }
+
+ /**
*
*/
private void preparePessimistic() {
@@ -216,11 +289,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
GridDistributedTxMapping nodeMapping = mappings.get(primary.id());
- if (nodeMapping == null) {
- nodeMapping = new GridDistributedTxMapping(primary);
-
- mappings.put(primary.id(), nodeMapping);
- }
+ if (nodeMapping == null)
+ mappings.put(primary.id(), nodeMapping = new GridDistributedTxMapping(primary));
txEntry.nodeId(primary.id());
@@ -244,56 +314,48 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
int miniId = 0;
+ Map<UUID, Collection<UUID>> txNodes = txMapping.transactionNodes();
+
for (final GridDistributedTxMapping m : mappings.values()) {
final ClusterNode primary = m.primary();
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- timeout,
- m.reads(),
- m.writes(),
- m.hasNearCacheEntries(),
- txMapping.transactionNodes(),
- true,
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash(),
- false,
- tx.activeCachesDeploymentEnabled());
-
- for (IgniteTxEntry txEntry : m.entries()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
+ if (primary.isLocal()) {
+ if (m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
+ GridNearTxPrepareRequest nearReq = createRequest(txMapping.transactionNodes(),
+ m,
+ timeout,
+ m.nearEntriesReads(),
+ m.nearEntriesWrites());
+
+ prepareLocal(nearReq, m, ++miniId, true, true);
+
+ GridNearTxPrepareRequest colocatedReq = createRequest(txNodes,
+ m,
+ timeout,
+ m.colocatedEntriesReads(),
+ m.colocatedEntriesWrites());
+
+ prepareLocal(colocatedReq, m, ++miniId, false, false);
+ }
+ else {
+ GridNearTxPrepareRequest req = createRequest(txNodes, m, timeout, m.reads(), m.writes());
+
+ prepareLocal(req, m, ++miniId, m.hasNearCacheEntries(), true);
+ }
}
+ else {
+ GridNearTxPrepareRequest req = createRequest(txNodes,
+ m,
+ timeout,
+ m.reads(),
+ m.writes());
- final MiniFuture fut = new MiniFuture(m, ++miniId);
+ final MiniFuture fut = new MiniFuture(m, ++miniId);
- req.miniId(fut.futureId());
+ req.miniId(fut.futureId());
- add(fut);
+ add(fut);
- if (primary.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
- tx,
- req);
-
- prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
- try {
- fut.onResult(prepFut.get());
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
- }
- });
- }
- else {
try {
cctx.io().send(primary, req, tx.ioPolicy());
@@ -397,12 +459,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/**
* @param res Response.
+ * @param updateMapping Update mapping flag.
*/
- void onResult(GridNearTxPrepareResponse res) {
+ void onResult(GridNearTxPrepareResponse res, boolean updateMapping) {
if (res.error() != null)
onError(res.error());
else {
- onPrepareResponse(m, res);
+ onPrepareResponse(m, res, updateMapping);
onDone(res);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index f35324a..a9675d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -180,9 +180,12 @@ public abstract class GridNearTxPrepareFutureAdapter extends
/**
* @param m Mapping.
* @param res Response.
+ * @param updateMapping Update mapping flag.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ final void onPrepareResponse(GridDistributedTxMapping m,
+ GridNearTxPrepareResponse res,
+ boolean updateMapping) {
if (res == null)
return;
@@ -245,24 +248,26 @@ public abstract class GridNearTxPrepareFutureAdapter extends
}
if (!m.empty()) {
- GridCacheVersion writeVer = res.writeVersion();
-
- if (writeVer == null)
- writeVer = res.dhtVersion();
-
// This step is very important as near and DHT versions grow separately.
cctx.versions().onReceived(nodeId, res.dhtVersion());
- // Register DHT version.
- m.dhtVersion(res.dhtVersion(), writeVer);
+ if (updateMapping) {
+ GridCacheVersion writeVer = res.writeVersion();
+
+ if (writeVer == null)
+ writeVer = res.dhtVersion();
- GridDistributedTxMapping map = tx.mappings().get(nodeId);
+ // Register DHT version.
+ m.dhtVersion(res.dhtVersion(), writeVer);
- if (map != null)
- map.dhtVersion(res.dhtVersion(), writeVer);
+ GridDistributedTxMapping map = tx.mappings().get(nodeId);
- if (m.hasNearCacheEntries())
- tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+ if (map != null)
+ map.dhtVersion(res.dhtVersion(), writeVer);
+
+ if (m.hasNearCacheEntries())
+ tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 153ad04..00a991e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -111,20 +111,17 @@ public class IgniteTxHandler {
/**
* @param nearNodeId Node ID.
* @param req Request.
- * @return Prepare future.
*/
- private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
+ private void processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
", node=" + nearNodeId + ']');
}
- IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareTx(nearNodeId, null, req);
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareNearTx(nearNodeId, req, false);
assert req.txState() != null || fut == null || fut.error() != null ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
-
- return fut;
}
/**
@@ -209,41 +206,13 @@ public class IgniteTxHandler {
}
/**
- * @param nearNodeId Near node ID that initiated transaction.
- * @param locTx Optional local transaction.
- * @param req Near prepare request.
- * @return Future for transaction.
- */
- public IgniteInternalFuture<GridNearTxPrepareResponse> prepareTx(
- UUID nearNodeId,
- @Nullable GridNearTxLocal locTx,
- GridNearTxPrepareRequest req
- ) {
- assert nearNodeId != null;
- assert req != null;
-
- if (locTx != null) {
- if (req.near()) {
- // Make sure not to provide Near entries to DHT cache.
- req.cloneEntries();
-
- return prepareNearTx(nearNodeId, req);
- }
- else
- return prepareColocatedTx(locTx, req);
- }
- else
- return prepareNearTx(nearNodeId, req);
- }
-
- /**
* Prepares local colocated tx.
*
* @param locTx Local transaction.
* @param req Near prepare request.
* @return Prepare future.
*/
- private IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
final GridNearTxLocal locTx,
final GridNearTxPrepareRequest req
) {
@@ -308,16 +277,19 @@ public class IgniteTxHandler {
}
/**
- * Prepares near transaction.
- *
* @param nearNodeId Near node ID that initiated transaction.
* @param req Near prepare request.
+ * @param locReq Local request flag.
* @return Prepare future.
*/
- private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
final UUID nearNodeId,
- final GridNearTxPrepareRequest req
+ final GridNearTxPrepareRequest req,
+ boolean locReq
) {
+ if (locReq)
+ req.cloneEntries();
+
ClusterNode nearNode = ctx.node(nearNodeId);
if (nearNode == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java
index 2c317df..735f2d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/TransformCollectionView.java
@@ -71,9 +71,4 @@ public class TransformCollectionView<T1, T2> extends GridSerializableCollection<
@Override public boolean isEmpty() {
return F.isEmpty(preds) ? col.isEmpty() : !iterator().hasNext();
}
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TransformCollectionView.class, this);
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/84df73e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
index 273f0ca..1cac85a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
@@ -40,6 +40,7 @@ import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* Tests specific combinations of cross-cache transactions.
@@ -70,14 +71,14 @@ public class IgniteCrossCacheTxSelfTest extends GridCommonAbstractTest {
* @return Node count for this test.
*/
private int nodeCount() {
- return 2;
+ return 4;
}
/**
* @return {@code True} if near cache should be enabled.
*/
protected boolean nearEnabled() {
- return false;
+ return true;
}
/** {@inheritDoc} */
@@ -137,6 +138,13 @@ public class IgniteCrossCacheTxSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticSerializable() throws Exception {
+ checkTxsSingleOp(OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
* @param concurrency Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.