You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/20 07:49:39 UTC
[66/70] [abbrv] ignite git commit: ignite-1561 Fixed tx prepare for
cross cache tx with near + colocated cache
ignite-1561 Fixed tx prepare for cross cache tx with near + colocated cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a5088265
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a5088265
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a5088265
Branch: refs/heads/ignite-2893
Commit: a5088265d7927aab702425249b3f0d6996cb989e
Parents: badf49c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Apr 20 08:29:42 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Apr 20 08:29:42 2017 +0300
----------------------------------------------------------------------
.../cache/GridCacheSharedContext.java | 6 +-
.../processors/cache/GridCacheUtils.java | 83 +++++--
.../distributed/GridDistributedTxMapping.java | 68 +++++-
.../GridDistributedTxRemoteAdapter.java | 2 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 2 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 2 +
...arOptimisticSerializableTxPrepareFuture.java | 218 ++++++++++++-------
.../near/GridNearOptimisticTxPrepareFuture.java | 95 ++++++--
.../GridNearPessimisticTxPrepareFuture.java | 186 ++++++++++------
.../near/GridNearTxFinishFuture.java | 6 +-
.../cache/distributed/near/GridNearTxLocal.java | 30 +--
.../near/GridNearTxPrepareFutureAdapter.java | 30 +--
.../cache/transactions/IgniteTxHandler.java | 49 +----
.../IgniteTxImplicitSingleStateImpl.java | 6 +
.../transactions/IgniteTxLocalAdapter.java | 7 +-
.../cache/transactions/IgniteTxLocalState.java | 10 +
.../cache/transactions/IgniteTxManager.java | 7 +-
.../cache/transactions/IgniteTxState.java | 2 +-
.../cache/transactions/IgniteTxStateImpl.java | 52 +++--
.../lang/gridfunc/PredicateCollectionView.java | 7 +-
.../util/lang/gridfunc/PredicateMapView.java | 6 -
.../util/lang/gridfunc/PredicateSetView.java | 6 -
.../lang/gridfunc/ReadOnlyCollectionView.java | 6 -
.../lang/gridfunc/ReadOnlyCollectionView2X.java | 6 -
.../lang/gridfunc/TransformCollectionView.java | 7 +-
.../util/lang/gridfunc/TransformMapView.java | 6 -
...sCacheTxNearEnabledRandomOperationsTest.java | 28 +++
.../cache/CrossCacheTxRandomOperationsTest.java | 23 +-
.../dht/GridNearCacheTxNodeFailureSelfTest.java | 31 ---
.../dht/IgniteCrossCacheTxSelfTest.java | 8 +
.../IgniteCacheFailoverTestSuite.java | 2 -
.../testsuites/IgniteCacheTestSuite2.java | 2 +
32 files changed, 631 insertions(+), 368 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 34bb321..79083e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -52,7 +52,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TransactionMetri
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -742,7 +742,7 @@ public class GridCacheSharedContext<K, V> {
* @param cacheCtx Cache context.
* @return Error message if transactions are incompatible.
*/
- @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridLongList activeCacheIds,
+ @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridIntList activeCacheIds,
GridCacheContext<K, V> cacheCtx) {
if (cacheCtx.systemTx() && !tx.system())
return "system cache can be enlisted only in system transaction";
@@ -751,7 +751,7 @@ public class GridCacheSharedContext<K, V> {
return "non-system cache can't be enlisted in system transaction";
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d20a782..51a95a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -211,14 +211,6 @@ public class GridCacheUtils {
/** Default transaction config. */
private static final TransactionConfiguration DEFAULT_TX_CFG = new TransactionConfiguration();
- /** Partition to state transformer. */
- private static final IgniteClosure PART2STATE =
- new C1<GridDhtLocalPartition, GridDhtPartitionState>() {
- @Override public GridDhtPartitionState apply(GridDhtLocalPartition p) {
- return p.state();
- }
- };
-
/** Empty predicate array. */
private static final IgnitePredicate[] EMPTY_FILTER = new IgnitePredicate[0];
@@ -247,24 +239,79 @@ public class GridCacheUtils {
private static final CacheEntryPredicate[] ALWAYS_FALSE0_ARR = new CacheEntryPredicate[] {ALWAYS_FALSE0};
/** Read filter. */
- private static final IgnitePredicate READ_FILTER = new P1<Object>() {
- @Override public boolean apply(Object e) {
- return ((IgniteTxEntry)e).op() == READ;
+ public static final IgnitePredicate READ_FILTER = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() == READ;
+ }
+
+ @Override public String toString() {
+ return "READ_FILTER";
+ }
+ };
+
+ /** Read filter. */
+ public static final IgnitePredicate READ_FILTER_NEAR = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() == READ && e.context().isNear();
+ }
+
+ @Override public String toString() {
+ return "READ_FILTER_NEAR";
+ }
+ };
+
+ /** Read filter. */
+ public static final IgnitePredicate READ_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() == READ && !e.context().isNear();
+ }
+
+ @Override public String toString() {
+ return "READ_FILTER_COLOCATED";
+ }
+ };
+
+ /** Write filter. */
+ public static final IgnitePredicate WRITE_FILTER = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() != READ;
+ }
+
+ @Override public String toString() {
+ return "WRITE_FILTER";
+ }
+ };
+
+ /** Write filter. */
+ public static final IgnitePredicate WRITE_FILTER_NEAR = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() != READ && e.context().isNear();
+ }
+
+ @Override public String toString() {
+ return "WRITE_FILTER_NEAR";
+ }
+ };
+
+ /** Write filter. */
+ public static final IgnitePredicate WRITE_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.op() != READ && !e.context().isNear();
}
@Override public String toString() {
- return "Cache transaction read filter";
+ return "WRITE_FILTER_COLOCATED";
}
};
/** Write filter. */
- private static final IgnitePredicate WRITE_FILTER = new P1<Object>() {
- @Override public boolean apply(Object e) {
- return ((IgniteTxEntry)e).op() != READ;
+ public static final IgnitePredicate FILTER_NEAR_CACHE_ENTRY = new P1<IgniteTxEntry>() {
+ @Override public boolean apply(IgniteTxEntry e) {
+ return e.context().isNear();
}
@Override public String toString() {
- return "Cache transaction write filter";
+ return "FILTER_NEAR_CACHE_ENTRY";
}
};
@@ -613,7 +660,7 @@ public class GridCacheUtils {
* @return Filter for transaction reads.
*/
@SuppressWarnings({"unchecked"})
- public static <K, V> IgnitePredicate<IgniteTxEntry> reads() {
+ public static IgnitePredicate<IgniteTxEntry> reads() {
return READ_FILTER;
}
@@ -621,7 +668,7 @@ public class GridCacheUtils {
* @return Filter for transaction writes.
*/
@SuppressWarnings({"unchecked"})
- public static <K, V> IgnitePredicate<IgniteTxEntry> writes() {
+ public static IgnitePredicate<IgniteTxEntry> writes() {
return WRITE_FILTER;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index f8cec50..45903aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -52,8 +52,8 @@ public class GridDistributedTxMapping {
/** {@code True} if this is last mapping for node. */
private boolean last;
- /** {@code True} if mapping is for near caches, {@code false} otherwise. */
- private boolean near;
+ /** Near cache entries count. */
+ private int nearEntries;
/** {@code True} if this is first mapping for optimistic tx on client node. */
private boolean clientFirst;
@@ -96,17 +96,17 @@ public class GridDistributedTxMapping {
}
/**
- * @return {@code True} if mapping is for near caches, {@code false} otherwise.
+ * @return {@code True} if has colocated cache entries.
*/
- public boolean near() {
- return near;
+ public boolean hasColocatedCacheEntries() {
+ return entries.size() > nearEntries;
}
/**
- * @param near {@code True} if mapping is for near caches, {@code false} otherwise.
+ * @return {@code True} if has near cache entries.
*/
- public void near(boolean near) {
- this.near = near;
+ public boolean hasNearCacheEntries() {
+ return nearEntries > 0;
}
/**
@@ -124,6 +124,15 @@ public class GridDistributedTxMapping {
}
/**
+ * @return Near cache entries.
+ */
+ @Nullable public Collection<IgniteTxEntry> nearCacheEntries() {
+ assert nearEntries > 0;
+
+ return F.view(entries, CU.FILTER_NEAR_CACHE_ENTRY);
+ }
+
+ /**
* @return {@code True} if lock is explicit.
*/
public boolean explicitLock() {
@@ -159,21 +168,58 @@ public class GridDistributedTxMapping {
* @return Reads.
*/
public Collection<IgniteTxEntry> reads() {
- return F.view(entries, CU.reads());
+ return F.view(entries, CU.READ_FILTER);
}
/**
* @return Writes.
*/
public Collection<IgniteTxEntry> writes() {
- return F.view(entries, CU.writes());
+ return F.view(entries, CU.WRITE_FILTER);
+ }
+
+ /**
+ * @return Near cache reads.
+ */
+ public Collection<IgniteTxEntry> nearEntriesReads() {
+ assert hasNearCacheEntries();
+
+ return F.view(entries, CU.READ_FILTER_NEAR);
+ }
+
+ /**
+ * @return Near cache writes.
+ */
+ public Collection<IgniteTxEntry> nearEntriesWrites() {
+ assert hasNearCacheEntries();
+
+ return F.view(entries, CU.WRITE_FILTER_NEAR);
+ }
+
+ /**
+ * @return Colocated cache reads.
+ */
+ public Collection<IgniteTxEntry> colocatedEntriesReads() {
+ assert hasColocatedCacheEntries();
+
+ return F.view(entries, CU.READ_FILTER_COLOCATED);
+ }
+
+ /**
+ * @return Colocated cache writes.
+ */
+ public Collection<IgniteTxEntry> colocatedEntriesWrites() {
+ assert hasColocatedCacheEntries();
+
+ return F.view(entries, CU.WRITE_FILTER_COLOCATED);
}
/**
* @param entry Adds entry.
*/
public void add(IgniteTxEntry entry) {
- entries.add(entry);
+ if (entries.add(entry) && entry.context().isNear())
+ nearEntries++;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index de8b29e..9cb04d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -387,7 +387,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
try {
- cctx.tm().prepareTx(this);
+ cctx.tm().prepareTx(this, null);
if (pessimistic() || isSystemInvalidate())
state(PREPARED);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index b1c7e5b..26f08fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -399,7 +399,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
addEntry(msgId, e);
}
- userPrepare();
+ userPrepare(null);
// Make sure to add future before calling prepare on it.
cctx.mvcc().addFuture(fut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 6e7b324..464df6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -920,6 +920,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
if (res.hasOwnedValue(ver.getKey()))
continue;
+ assert txEntry != null : ver;
+
GridCacheContext cacheCtx = txEntry.context();
while (true) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index cbd9d23..6060729 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -55,7 +55,6 @@ import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
@@ -199,7 +198,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture mini = miniFuture(res.miniId());
if (mini != null)
- mini.onResult(res);
+ mini.onResult(res, true);
}
}
@@ -339,11 +338,17 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
GridDhtTxMapping txMapping = new GridDhtTxMapping();
- Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+ Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
- for (IgniteTxEntry write : writes)
+ boolean hasNearCache = false;
+
+ for (IgniteTxEntry write : writes) {
map(write, topVer, mappings, txMapping, remap, topLocked);
+ if (write.context().isNear())
+ hasNearCache = true;
+ }
+
for (IgniteTxEntry read : reads)
map(read, topVer, mappings, txMapping, remap, topLocked);
@@ -363,12 +368,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase(txMapping);
+ if (!hasNearCache)
+ checkOnePhase(txMapping);
+ MiniFuture locNearEntriesFut = null;
+
+ // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}.
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
- add(new MiniFuture(this, m, ++miniId));
+ MiniFuture fut = new MiniFuture(this, m, ++miniId);
+
+ add(fut);
+
+ if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
+ assert locNearEntriesFut == null;
+
+ locNearEntriesFut = fut;
+
+ add(new MiniFuture(this, m, ++miniId));
+ }
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -383,7 +402,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture fut = (MiniFuture)fut0;
- IgniteCheckedException err = prepare(fut, txMapping);
+ IgniteCheckedException err = prepare(fut, txMapping.transactionNodes(), locNearEntriesFut);
if (err != null) {
while (it.hasNext()) {
@@ -417,9 +436,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
* @param fut Mini future.
+ * @param txNodes Tx nodes.
+ * @param locNearEntriesFut Local future for near cache entries prepare.
* @return Prepare error if any.
*/
- @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) {
+ @Nullable private IgniteCheckedException prepare(final MiniFuture fut,
+ Map<UUID, Collection<UUID>> txNodes,
+ @Nullable MiniFuture locNearEntriesFut) {
GridDistributedTxMapping m = fut.mapping();
final ClusterNode primary = m.primary();
@@ -434,36 +457,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
return err;
}
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- timeout,
- m.reads(),
- m.writes(),
- m.near(),
- txMapping.transactionNodes(),
- m.last(),
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash(),
- m.clientFirst(),
- tx.activeCachesDeploymentEnabled());
-
- for (IgniteTxEntry txEntry : m.entries()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
- }
-
// Must lock near entries separately.
- if (m.near()) {
+ if (m.hasNearCacheEntries()) {
try {
- tx.optimisticLockEntries(m.entries());
-
- tx.userPrepare();
+ cctx.tm().prepareTx(tx, m.nearCacheEntries());
}
catch (IgniteCheckedException e) {
fut.onResult(e);
@@ -472,27 +469,36 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
}
- req.miniId(fut.futureId());
-
- // If this is the primary node for the keys.
if (primary.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
- tx,
- req);
-
- prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
- try {
- fut.onResult(prepFut.get());
- }
- catch (IgniteCheckedException e) {
- fut.onResult(e);
- }
- }
- });
+ if (locNearEntriesFut != null) {
+ boolean nearEntries = fut == locNearEntriesFut;
+
+ GridNearTxPrepareRequest req = createRequest(txNodes,
+ fut,
+ timeout,
+ nearEntries ? m.nearEntriesReads() : m.colocatedEntriesReads(),
+ nearEntries ? m.nearEntriesWrites() : m.colocatedEntriesWrites());
+
+ prepareLocal(req, fut, nearEntries);
+ }
+ else {
+ GridNearTxPrepareRequest req = createRequest(txNodes,
+ fut,
+ timeout,
+ m.reads(),
+ m.writes());
+
+ prepareLocal(req, fut, m.hasNearCacheEntries());
+ }
}
else {
try {
+ GridNearTxPrepareRequest req = createRequest(txNodes,
+ fut,
+ timeout,
+ m.reads(),
+ m.writes());
+
cctx.io().send(primary, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException e) {
@@ -513,16 +519,86 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/**
+ * @param txNodes Tx nodes.
+ * @param fut Future.
+ * @param timeout Timeout.
+ * @param reads Read entries.
+ * @param writes Write entries.
+ * @return Request.
+ */
+ private GridNearTxPrepareRequest createRequest(
+ Map<UUID, Collection<UUID>> txNodes,
+ MiniFuture fut,
+ long timeout,
+ Collection<IgniteTxEntry> reads,
+ Collection<IgniteTxEntry> writes) {
+ GridDistributedTxMapping m = fut.mapping();
+
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ reads,
+ writes,
+ m.hasNearCacheEntries(),
+ txNodes,
+ m.last(),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ m.clientFirst(),
+ tx.activeCachesDeploymentEnabled());
+
+ for (IgniteTxEntry txEntry : writes) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ req.miniId(fut.futureId());
+
+ return req;
+ }
+
+ /**
+ * @param req Request.
+ * @param fut Future.
+ * @param nearEntries {@code True} if prepare near cache entries.
+ */
+ private void prepareLocal(GridNearTxPrepareRequest req,
+ final MiniFuture fut,
+ final boolean nearEntries) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ?
+ cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+ cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get(), nearEntries);
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+ }
+ }
+ });
+ }
+
+ /**
* @param entry Transaction entry.
* @param topVer Topology version.
* @param curMapping Current mapping.
+ * @param txMapping Mapping.
* @param remap Remap flag.
* @param topLocked Topology locked flag.
*/
private void map(
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
- Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+ Map<UUID, GridDistributedTxMapping> curMapping,
GridDhtTxMapping txMapping,
boolean remap,
boolean topLocked
@@ -565,30 +641,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
}
- IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
-
- GridDistributedTxMapping cur = curMapping.get(key);
+ GridDistributedTxMapping cur = curMapping.get(primary.id());
if (cur == null) {
cur = new GridDistributedTxMapping(primary);
- curMapping.put(key, cur);
-
- if (primary.isLocal()) {
- if (entry.context().isNear())
- tx.nearLocallyMapped(true);
- else if (entry.context().isColocated())
- tx.colocatedLocallyMapped(true);
- }
-
- // Initialize near flag right away.
- cur.near(cacheCtx.isNear());
+ curMapping.put(primary.id(), cur);
cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
cur.last(true);
}
+ if (primary.isLocal()) {
+ if (cacheCtx.isNear())
+ tx.nearLocallyMapped(true);
+ else if (cacheCtx.isColocated())
+ tx.colocatedLocallyMapped(true);
+ }
+
cur.add(entry);
if (entry.explicitVersion() != null) {
@@ -683,9 +754,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
*
*/
private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Receive result flag updater. */
private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
@@ -773,9 +841,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
* @param res Result callback.
+ * @param updateMapping Update mapping flag.
*/
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- void onResult(final GridNearTxPrepareResponse res) {
+ void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) {
if (isDone())
return;
@@ -878,7 +947,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
onDone(res);
}
else {
- parent.onPrepareResponse(m, res);
+ parent.onPrepareResponse(m, res, updateMapping);
// Finish this mini future (need result only on client node).
onDone(parent.cctx.kernalContext().clientNode() ? res : null);
@@ -892,8 +961,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
*/
private void remap(final GridNearTxPrepareResponse res) {
parent.prepareOnTopology(true, new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
onDone(res);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index bc47c13..f4ce1ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -345,6 +345,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
* @param write Write.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
+ * @param remap Remap flag.
*/
private void prepareSingle(IgniteTxEntry write, boolean topLocked, boolean remap) {
write.clearEntryReadVersion();
@@ -382,7 +383,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase(txMapping);
+ if (!write.context().isNear())
+ checkOnePhase(txMapping);
+
+ assert !(mapping.hasColocatedCacheEntries() && mapping.hasNearCacheEntries()) : mapping;
proceedPrepare(mapping, null);
}
@@ -390,6 +394,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
* @param writes Write entries.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
+ * @param remap Remap flag.
*/
private void prepare(
Iterable<IgniteTxEntry> writes,
@@ -402,24 +407,37 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
txMapping = new GridDhtTxMapping();
- Map<UUID, GridDistributedTxMapping> map = new HashMap<>();
+ Map<Object, GridDistributedTxMapping> map = new HashMap<>();
// Assign keys to primary nodes.
GridDistributedTxMapping cur = null;
Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
+ boolean hasNearCache = false;
+
for (IgniteTxEntry write : writes) {
write.clearEntryReadVersion();
GridDistributedTxMapping updated = map(write, topVer, cur, topLocked, remap);
+ if (write.context().isNear())
+ hasNearCache = true;
+
if (cur != updated) {
mappings.offer(updated);
updated.last(true);
- GridDistributedTxMapping prev = map.put(updated.primary().id(), updated);
+ ClusterNode primary = updated.primary();
+
+ assert !primary.isLocal() || !cctx.kernalContext().clientNode();
+
+ // Minor optimization to not create MappingKey: on client node can not have mapping for local node.
+ Object key = cctx.kernalContext().clientNode() ? primary.id() :
+ new MappingKey(primary.id(), primary.isLocal() && updated.hasNearCacheEntries());
+
+ GridDistributedTxMapping prev = map.put(key, updated);
if (prev != null)
prev.last(false);
@@ -451,7 +469,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase(txMapping);
+ if (!hasNearCache)
+ checkOnePhase(txMapping);
proceedPrepare(mappings);
}
@@ -497,7 +516,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
timeout,
null,
m.writes(),
- m.near(),
+ m.hasNearCacheEntries(),
txMapping.transactionNodes(),
m.last(),
tx.onePhaseCommit(),
@@ -515,14 +534,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
// Must lock near entries separately.
- if (m.near()) {
+ if (m.hasNearCacheEntries()) {
try {
- tx.optimisticLockEntries(req.writes());
-
- tx.userPrepare();
+ cctx.tm().prepareTx(tx, m.nearCacheEntries());
}
catch (IgniteCheckedException e) {
onError(e, false);
+
+ return;
}
}
@@ -532,13 +551,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
add(fut); // Append new future.
- // If this is the primary node for the keys.
if (n.isLocal()) {
- // At this point, if any new node joined, then it is
- // waiting for this transaction to complete, so
- // partition reassignments are not possible here.
+ assert !(m.hasColocatedCacheEntries() && m.hasNearCacheEntries()) : m;
+
IgniteInternalFuture<GridNearTxPrepareResponse> prepFut =
- cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+ m.hasNearCacheEntries() ? cctx.tm().txHandler().prepareNearTx(n.id(), req, true)
+ : cctx.tm().txHandler().prepareColocatedTx(tx, req);
prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
@Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
@@ -590,6 +608,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param topVer Topology version.
* @param cur Current mapping.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
+ * @param remap Remap flag.
* @return Mapping.
*/
private GridDistributedTxMapping map(
@@ -644,14 +663,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- if (cur == null || !cur.primary().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+ if (cur == null || !cur.primary().id().equals(primary.id()) ||
+ (primary.isLocal() && cur.hasNearCacheEntries() != cacheCtx.isNear())) {
boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();
cur = new GridDistributedTxMapping(primary);
- // Initialize near flag right away.
- cur.near(cacheCtx.isNear());
-
cur.clientFirst(clientFirst);
}
@@ -908,7 +925,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
remap();
}
else {
- parent.onPrepareResponse(m, res);
+ parent.onPrepareResponse(m, res, m.hasNearCacheEntries());
// Proceed prepare before finishing mini future.
if (mappings != null)
@@ -937,4 +954,44 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
+
+ /**
+ *
+ */
+ private static class MappingKey {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private final boolean nearEntries;
+
+ /**
+ * @param nodeId Node ID.
+ * @param nearEntries Near cache entries flag (should be true only for local node).
+ */
+ MappingKey(UUID nodeId, boolean nearEntries) {
+ this.nodeId = nodeId;
+ this.nearEntries = nearEntries;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
+ @Override public boolean equals(Object o) {
+ MappingKey that = (MappingKey) o;
+
+ return nearEntries == that.nearEntries && nodeId.equals(that.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+ res = 31 * res + (nearEntries ? 1 : 0);
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MappingKey.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index cb15bca..e934319 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -102,7 +102,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (f != null) {
assert f.primary().id().equals(nodeId);
- f.onResult(res);
+ f.onResult(res, true);
}
else {
if (msgLog.isDebugEnabled()) {
@@ -169,7 +169,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
try {
- tx.userPrepare();
+ tx.userPrepare(Collections.<IgniteTxEntry>emptyList());
cctx.mvcc().addFuture(this);
@@ -181,20 +181,97 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
/**
+ * @param txNodes Tx nodes.
+ * @param m Mapping.
+ * @param timeout Timeout.
+ * @param reads Reads.
+ * @param writes Writes.
+ * @return Request.
+ */
+ private GridNearTxPrepareRequest createRequest(Map<UUID, Collection<UUID>> txNodes,
+ GridDistributedTxMapping m,
+ long timeout,
+ Collection<IgniteTxEntry> reads,
+ Collection<IgniteTxEntry> writes) {
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ timeout,
+ reads,
+ writes,
+ m.hasNearCacheEntries(),
+ txNodes,
+ true,
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash(),
+ false,
+ tx.activeCachesDeploymentEnabled());
+
+ for (IgniteTxEntry txEntry : writes) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ return req;
+ }
+
+ /**
+ * @param req Request.
+ * @param m Mapping.
+ * @param miniId Mini future ID.
+ * @param nearEntries {@code True} if prepare near cache entries.
+ */
+ private void prepareLocal(GridNearTxPrepareRequest req,
+ GridDistributedTxMapping m,
+ int miniId,
+ final boolean nearEntries) {
+ final MiniFuture fut = new MiniFuture(m, miniId);
+
+ req.miniId(fut.futureId());
+
+ add(fut);
+
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ?
+ cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) :
+ cctx.tm().txHandler().prepareColocatedTx(tx, req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get(), nearEntries);
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+ }
+ });
+ }
+
+ /**
*
*/
private void preparePessimistic() {
- Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+ Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
AffinityTopologyVersion topVer = tx.topologyVersion();
GridDhtTxMapping txMapping = new GridDhtTxMapping();
+ boolean hasNearCache = false;
+
for (IgniteTxEntry txEntry : tx.allEntries()) {
txEntry.clearEntryReadVersion();
GridCacheContext cacheCtx = txEntry.context();
+ if (cacheCtx.isNear())
+ hasNearCache = true;
+
List<ClusterNode> nodes;
if (!cacheCtx.isLocal()) {
@@ -205,21 +282,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
else
nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
- ClusterNode primary = F.first(nodes);
-
- boolean near = cacheCtx.isNear();
+ assert !nodes.isEmpty();
- IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);
+ ClusterNode primary = nodes.get(0);
- GridDistributedTxMapping nodeMapping = mappings.get(key);
+ GridDistributedTxMapping nodeMapping = mappings.get(primary.id());
- if (nodeMapping == null) {
- nodeMapping = new GridDistributedTxMapping(primary);
-
- nodeMapping.near(cacheCtx.isNear());
-
- mappings.put(key, nodeMapping);
- }
+ if (nodeMapping == null)
+ mappings.put(primary.id(), nodeMapping = new GridDistributedTxMapping(primary));
txEntry.nodeId(primary.id());
@@ -230,7 +300,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase(txMapping);
+ if (!hasNearCache)
+ checkOnePhase(txMapping);
long timeout = tx.remainingTime();
@@ -242,56 +313,48 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
int miniId = 0;
+ Map<UUID, Collection<UUID>> txNodes = txMapping.transactionNodes();
+
for (final GridDistributedTxMapping m : mappings.values()) {
final ClusterNode primary = m.primary();
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- timeout,
- m.reads(),
- m.writes(),
- m.near(),
- txMapping.transactionNodes(),
- true,
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash(),
- false,
- tx.activeCachesDeploymentEnabled());
-
- for (IgniteTxEntry txEntry : m.entries()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
+ if (primary.isLocal()) {
+ if (m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
+ GridNearTxPrepareRequest nearReq = createRequest(txMapping.transactionNodes(),
+ m,
+ timeout,
+ m.nearEntriesReads(),
+ m.nearEntriesWrites());
+
+ prepareLocal(nearReq, m, ++miniId, true);
+
+ GridNearTxPrepareRequest colocatedReq = createRequest(txNodes,
+ m,
+ timeout,
+ m.colocatedEntriesReads(),
+ m.colocatedEntriesWrites());
+
+ prepareLocal(colocatedReq, m, ++miniId, false);
+ }
+ else {
+ GridNearTxPrepareRequest req = createRequest(txNodes, m, timeout, m.reads(), m.writes());
+
+ prepareLocal(req, m, ++miniId, m.hasNearCacheEntries());
+ }
}
+ else {
+ GridNearTxPrepareRequest req = createRequest(txNodes,
+ m,
+ timeout,
+ m.reads(),
+ m.writes());
- final MiniFuture fut = new MiniFuture(m, ++miniId);
+ final MiniFuture fut = new MiniFuture(m, ++miniId);
- req.miniId(fut.futureId());
+ req.miniId(fut.futureId());
- add(fut);
+ add(fut);
- if (primary.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
- tx,
- req);
-
- prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
- try {
- fut.onResult(prepFut.get());
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
- }
- });
- }
- else {
try {
cctx.io().send(primary, req, tx.ioPolicy());
@@ -395,12 +458,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/**
* @param res Response.
+ * @param updateMapping Update mapping flag.
*/
- void onResult(GridNearTxPrepareResponse res) {
+ void onResult(GridNearTxPrepareResponse res, boolean updateMapping) {
if (res.error() != null)
onError(res.error());
else {
- onPrepareResponse(m, res);
+ onPrepareResponse(m, res, updateMapping);
onDone(res);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 37be0fb..89874ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -642,7 +642,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
* @param mapping Mapping to finish.
*/
private void readyNearMappingFromBackup(GridDistributedTxMapping mapping) {
- if (mapping.near()) {
+ if (mapping.hasNearCacheEntries()) {
GridCacheVersion xidVer = tx.xidVersion();
mapping.dhtVersion(xidVer, xidVer);
@@ -676,7 +676,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
private void finish(int miniId, GridDistributedTxMapping m, boolean commit) {
ClusterNode n = m.primary();
- assert !m.empty();
+ assert !m.empty() : m;
CacheWriteSynchronizationMode syncMode = tx.syncMode();
@@ -698,7 +698,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
m.explicitLock(),
tx.storeEnabled(),
tx.topologyVersion(),
- completedVer, // Reuse 'baseVersion' to do not add new fields in message.
+ completedVer, // Reuse 'baseVersion' to do not add new fields in message.
null,
null,
tx.size(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 62af536..f795ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -151,9 +151,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
@GridToStringExclude
private volatile GridNearTxFinishFuture rollbackFut;
- /** Entries to lock on next step of prepare stage. */
- private Collection<IgniteTxEntry> optimisticLockEntries = Collections.emptyList();
-
/** True if transaction contains near cache entries mapped to local node. */
private boolean nearLocallyMapped;
@@ -2425,14 +2422,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> optimisticLockEntries() {
- return optimisticLockEntries;
- }
+ assert false : "Should not be called";
- /**
- * @param optimisticLockEntries Optimistic lock entries.
- */
- void optimisticLockEntries(Collection<IgniteTxEntry> optimisticLockEntries) {
- this.optimisticLockEntries = optimisticLockEntries;
+ throw new UnsupportedOperationException();
}
/**
@@ -2862,8 +2854,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
if (m == null) {
mappings.put(m = new GridDistributedTxMapping(primary));
- m.near(map.near());
-
if (map.explicitLock())
m.markExplicitLock();
}
@@ -2889,8 +2879,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
mappings.put(m);
- m.near(map.near());
-
if (map.explicitLock())
m.markExplicitLock();
@@ -2933,14 +2921,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers)
{
+ assert mapping.hasNearCacheEntries() : mapping;
+
// Process writes, then reads.
for (IgniteTxEntry txEntry : mapping.entries()) {
- if (CU.writes().apply(txEntry))
+ if (CU.WRITE_FILTER_NEAR.apply(txEntry))
readyNearLock(txEntry, mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers);
}
for (IgniteTxEntry txEntry : mapping.entries()) {
- if (CU.reads().apply(txEntry))
+ if (CU.READ_FILTER_NEAR.apply(txEntry))
readyNearLock(txEntry, mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers);
}
}
@@ -2952,7 +2942,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
*/
- void readyNearLock(IgniteTxEntry txEntry,
+ private void readyNearLock(IgniteTxEntry txEntry,
GridCacheVersion dhtVer,
Collection<GridCacheVersion> pendingVers,
Collection<GridCacheVersion> committedVers,
@@ -3333,11 +3323,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
needReturnValue() && implicit());
try {
- // At this point all the entries passed in must be enlisted in transaction because this is an
- // optimistic transaction.
- optimisticLockEntries = (serializable() && optimistic()) ? F.concat(false, writes, reads) : writes;
-
- userPrepare();
+ userPrepare((serializable() && optimistic()) ? F.concat(false, writes, reads) : writes);
// Make sure to add future before calling prepare on it.
cctx.mvcc().addFuture(fut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 7f1f5a2..004e4da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -172,7 +172,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
Collection<UUID> backups = entry.getValue();
- if (backups.size() <= 1)
+ if (backups.size() <= 1 && !tx.txState().hasNearCacheConfigured(cctx, tx.topologyVersion()))
tx.onePhaseCommit(true);
}
}
@@ -180,9 +180,12 @@ public abstract class GridNearTxPrepareFutureAdapter extends
/**
* @param m Mapping.
* @param res Response.
+ * @param updateMapping Update mapping flag.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ final void onPrepareResponse(GridDistributedTxMapping m,
+ GridNearTxPrepareResponse res,
+ boolean updateMapping) {
if (res == null)
return;
@@ -245,24 +248,25 @@ public abstract class GridNearTxPrepareFutureAdapter extends
}
if (!m.empty()) {
- GridCacheVersion writeVer = res.writeVersion();
-
- if (writeVer == null)
- writeVer = res.dhtVersion();
-
// This step is very important as near and DHT versions grow separately.
cctx.versions().onReceived(nodeId, res.dhtVersion());
- // Register DHT version.
- m.dhtVersion(res.dhtVersion(), writeVer);
+ if (updateMapping && m.hasNearCacheEntries()) {
+ GridCacheVersion writeVer = res.writeVersion();
+
+ if (writeVer == null)
+ writeVer = res.dhtVersion();
- GridDistributedTxMapping map = tx.mappings().get(nodeId);
+ // Register DHT version.
+ m.dhtVersion(res.dhtVersion(), writeVer);
- if (map != null)
- map.dhtVersion(res.dhtVersion(), writeVer);
+ GridDistributedTxMapping map = tx.mappings().get(nodeId);
+
+ if (map != null)
+ map.dhtVersion(res.dhtVersion(), writeVer);
- if (m.near())
tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 153ad04..a591517 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -111,20 +111,17 @@ public class IgniteTxHandler {
/**
* @param nearNodeId Node ID.
* @param req Request.
- * @return Prepare future.
*/
- private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
+ private void processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
", node=" + nearNodeId + ']');
}
- IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareTx(nearNodeId, null, req);
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareNearTx(nearNodeId, req, false);
assert req.txState() != null || fut == null || fut.error() != null ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
-
- return fut;
}
/**
@@ -209,41 +206,13 @@ public class IgniteTxHandler {
}
/**
- * @param nearNodeId Near node ID that initiated transaction.
- * @param locTx Optional local transaction.
- * @param req Near prepare request.
- * @return Future for transaction.
- */
- public IgniteInternalFuture<GridNearTxPrepareResponse> prepareTx(
- UUID nearNodeId,
- @Nullable GridNearTxLocal locTx,
- GridNearTxPrepareRequest req
- ) {
- assert nearNodeId != null;
- assert req != null;
-
- if (locTx != null) {
- if (req.near()) {
- // Make sure not to provide Near entries to DHT cache.
- req.cloneEntries();
-
- return prepareNearTx(nearNodeId, req);
- }
- else
- return prepareColocatedTx(locTx, req);
- }
- else
- return prepareNearTx(nearNodeId, req);
- }
-
- /**
* Prepares local colocated tx.
*
* @param locTx Local transaction.
* @param req Near prepare request.
* @return Prepare future.
*/
- private IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
final GridNearTxLocal locTx,
final GridNearTxPrepareRequest req
) {
@@ -308,16 +277,20 @@ public class IgniteTxHandler {
}
/**
- * Prepares near transaction.
- *
* @param nearNodeId Near node ID that initiated transaction.
* @param req Near prepare request.
+ * @param locReq Local request flag.
* @return Prepare future.
*/
- private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
final UUID nearNodeId,
- final GridNearTxPrepareRequest req
+ final GridNearTxPrepareRequest req,
+ boolean locReq
) {
+ // Make sure not to provide Near entries to DHT cache.
+ if (locReq)
+ req.cloneEntries();
+
ClusterNode nearNode = ctx.node(nearNodeId);
if (nearNode == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 5743bfb..36f5f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -293,6 +294,11 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
+ @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
+ return cacheCtx != null ? ctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) : false;
+ }
+
+ /** {@inheritDoc} */
public String toString() {
return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index a59ff51..5a708d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -225,7 +225,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/** {@inheritDoc} */
- @Override public IgniteTxState txState() {
+ @Override public IgniteTxLocalState txState() {
return txState;
}
@@ -401,10 +401,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
+ * @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}.
* @throws IgniteCheckedException If prepare step failed.
*/
@SuppressWarnings({"CatchGenericClass"})
- public void userPrepare() throws IgniteCheckedException {
+ public void userPrepare(@Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
if (state() != PREPARING) {
if (remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
@@ -420,7 +421,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
checkValid();
try {
- cctx.tm().prepareTx(this);
+ cctx.tm().prepareTx(this, entries);
}
catch (IgniteCheckedException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index 123d396..fe9fcbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+
/**
*
*/
@@ -41,4 +44,11 @@ public interface IgniteTxLocalState extends IgniteTxState {
*
*/
public void seal();
+
+ /**
+ * @param ctx Context.
+ * @param topVer Topology version.
+ * @return {@code True} if tx has cache with created near cache.
+ */
+ public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index da49c06..2da8dee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -774,12 +774,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Handles prepare stage of 2PC.
+ * Handles prepare stage.
*
* @param tx Transaction to prepare.
+ * @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}.
* @throws IgniteCheckedException If preparation failed.
*/
- public void prepareTx(IgniteInternalTx tx) throws IgniteCheckedException {
+ public void prepareTx(IgniteInternalTx tx, @Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
if (tx.state() == MARKED_ROLLBACK) {
if (tx.remainingTime() == -1)
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
@@ -799,7 +800,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// Optimistic.
assert tx.optimistic() || !tx.local();
- if (!lockMultiple(tx, tx.optimisticLockEntries())) {
+ if (!lockMultiple(tx, entries != null ? entries : tx.optimisticLockEntries())) {
tx.setRollbackOnly();
throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction (lock conflict): " + tx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 7a45b6e..ed2526e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -85,7 +85,7 @@ public interface IgniteTxState {
public boolean hasNearCache(GridCacheSharedContext cctx);
/**
- * @param cacheCtx Ccntext.
+ * @param cacheCtx Context.
* @param tx Transaction.
* @throws IgniteCheckedException If cache check failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 304473e..3679208 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -28,12 +28,14 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
-import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -52,7 +54,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
*/
public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** Active cache IDs. */
- private GridLongList activeCacheIds = new GridLongList();
+ private GridIntList activeCacheIds = new GridIntList();
/** Per-transaction read map. */
@GridToStringInclude
@@ -77,13 +79,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Nullable @Override public Integer firstCacheId() {
- return activeCacheIds.isEmpty() ? null : (int)activeCacheIds.get(0);
+ return activeCacheIds.isEmpty() ? null : activeCacheIds.get(0);
}
/** {@inheritDoc} */
@Override public void unwindEvicts(GridCacheSharedContext cctx) {
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int) activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
GridCacheContext ctx = cctx.cacheContext(cacheId);
@@ -95,7 +97,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
if (activeCacheIds.size() == 1) {
- int cacheId = (int)activeCacheIds.get(0);
+ int cacheId = activeCacheIds.get(0);
return cctx.cacheContext(cacheId);
}
@@ -106,7 +108,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public void awaitLastFut(GridCacheSharedContext cctx) {
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
cctx.cacheContext(cacheId).cache().awaitLastFut();
}
@@ -157,7 +159,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
}
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
@@ -175,7 +177,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
CacheWriteSynchronizationMode syncMode = CacheWriteSynchronizationMode.FULL_ASYNC;
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
CacheWriteSynchronizationMode cacheSyncMode =
cctx.cacheContext(cacheId).config().getWriteSynchronizationMode();
@@ -202,7 +204,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public boolean hasNearCache(GridCacheSharedContext cctx) {
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
@@ -236,7 +238,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
int idx = 0;
for (int i = 0; i < activeCacheIds.size(); i++) {
- int activeCacheId = (int)activeCacheIds.get(i);
+ int activeCacheId = activeCacheIds.get(i);
cacheNames.append(cctx.cacheContext(activeCacheId).name());
@@ -267,7 +269,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
GridCacheContext<?, ?> nonLocCtx = null;
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
@@ -299,7 +301,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
GridCacheContext<?, ?> nonLocCtx = null;
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
@@ -319,7 +321,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
@Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
if (!activeCacheIds.isEmpty()) {
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
CacheStoreManager store = cctx.cacheContext(cacheId).store();
@@ -334,7 +336,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public boolean hasInterceptor(GridCacheSharedContext cctx) {
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
CacheInterceptor interceptor = cctx.cacheContext(cacheId).config().getInterceptor();
@@ -347,13 +349,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
- GridLongList cacheIds = activeCacheIds;
+ GridIntList cacheIds = activeCacheIds;
if (!cacheIds.isEmpty()) {
Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size());
for (int i = 0; i < cacheIds.size(); i++) {
- int cacheId = (int)cacheIds.get(i);
+ int cacheId = cacheIds.get(i);
CacheStoreManager store = cctx.cacheContext(cacheId).store();
@@ -370,7 +372,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = (int)activeCacheIds.get(i);
+ int cacheId = activeCacheIds.get(i);
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
@@ -474,6 +476,22 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
+ @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
+ DiscoCache discoCache = ctx.discovery().discoCache(topVer);
+
+ assert discoCache != null : topVer;
+
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = activeCacheIds.get(i);
+
+ if (discoCache.hasNearCache(cacheId))
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
public String toString() {
return S.toString(IgniteTxStateImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java
index b4785a7..ff0105b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import org.apache.ignite.internal.util.GridSerializableCollection;
import org.apache.ignite.internal.util.lang.GridFunc;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.NotNull;
@@ -45,6 +44,7 @@ public class PredicateCollectionView<T> extends GridSerializableCollection<T> {
* @param col Input col that serves as a base for the view.
* @param preds Optional preds. If preds are not provided - all elements will be in the view.
*/
+ @SafeVarargs
public PredicateCollectionView(Collection<T> col, IgnitePredicate<? super T>... preds) {
this.col = col;
this.preds = preds;
@@ -70,9 +70,4 @@ public class PredicateCollectionView<T> extends GridSerializableCollection<T> {
@Override public boolean isEmpty() {
return F.isEmpty(preds) ? col.isEmpty() : !iterator().hasNext();
}
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PredicateCollectionView.class, this);
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java
index d5b97a6..01e6d8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.util.GridSerializableMap;
import org.apache.ignite.internal.util.GridSerializableSet;
import org.apache.ignite.internal.util.lang.GridFunc;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -113,9 +112,4 @@ public class PredicateMapView<K, V> extends GridSerializableMap<K, V> {
@Override public boolean containsKey(Object key) {
return GridFunc.isAll((K)key, preds) && map.containsKey(key);
}
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PredicateMapView.class, this);
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java
index 99fc2fd..8937107 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.util.GridSerializableMap;
import org.apache.ignite.internal.util.GridSerializableSet;
import org.apache.ignite.internal.util.lang.GridFunc;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.NotNull;
@@ -145,9 +144,4 @@ public class PredicateSetView<K, V> extends GridSerializableMap<K, V> {
@Override public boolean containsKey(Object key) {
return GridFunc.isAll((K)key, preds) && set.contains(key);
}
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PredicateSetView.class, this);
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java
index 8186914..d8aa1d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import org.apache.ignite.internal.util.GridSerializableCollection;
import org.apache.ignite.internal.util.GridSerializableIterator;
import org.apache.ignite.internal.util.lang.GridFunc;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -87,9 +86,4 @@ public class ReadOnlyCollectionView<T> extends GridSerializableCollection<T> {
@Override public boolean equals(Object obj) {
return obj instanceof Collection && GridFunc.eqNotOrdered(this, (Collection)obj);
}
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ReadOnlyCollectionView.class, this);
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java
index 82ec651..7a60e17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import org.apache.ignite.internal.util.GridSerializableCollection;
import org.apache.ignite.internal.util.GridSerializableIterator;
import org.apache.ignite.internal.util.lang.GridFunc;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
/**
@@ -92,9 +91,4 @@ public class ReadOnlyCollectionView2X<T> extends GridSerializableCollection<T> {
@Override public boolean equals(Object obj) {
return obj instanceof Collection && GridFunc.eqNotOrdered(this, (Collection<?>)obj);
}
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ReadOnlyCollectionView2X.class, this);
- }
}