You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 18:27:48 UTC
[25/31] incubator-ignite git commit: ignite-471-2: huge merge from
sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 10b84e2..adea9e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -54,7 +54,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
private GridDhtPartitionTopology top;
/** Preloader. */
- protected GridCachePreloader<K, V> preldr;
+ protected GridCachePreloader preldr;
/** Multi tx future holder. */
private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>();
@@ -75,7 +75,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
super(ctx, ctx.config().getStartSize());
- top = new GridDhtPartitionTopologyImpl<>(ctx);
+ top = new GridDhtPartitionTopologyImpl(ctx);
}
/**
@@ -87,7 +87,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
super(ctx, map);
- top = new GridDhtPartitionTopologyImpl<>(ctx);
+ top = new GridDhtPartitionTopologyImpl(ctx);
}
/** {@inheritDoc} */
@@ -168,17 +168,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/** {@inheritDoc} */
- @Override public GridCachePreloader<K, V> preloader() {
+ @Override public GridCachePreloader preloader() {
return preldr;
}
/**
* @return DHT preloader.
*/
- public GridDhtPreloader<K, V> dhtPreloader() {
+ public GridDhtPreloader dhtPreloader() {
assert preldr instanceof GridDhtPreloader;
- return (GridDhtPreloader<K, V>)preldr;
+ return (GridDhtPreloader)preldr;
}
/**
@@ -932,6 +932,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/**
+ * @param expVer Expected topology version.
+ * @param curVer Current topology version.
+ * @return {@code True} if cache affinity changed and operation should be remapped.
+ */
+ protected final boolean needRemap(AffinityTopologyVersion expVer, AffinityTopologyVersion curVer) {
+ if (expVer.equals(curVer))
+ return false;
+
+ Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
+ Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+
+ return !cacheNodes0.equals(cacheNodes1);
+ }
+
+ /**
* @param primary If {@code true} includes primary entries.
* @param backup If {@code true} includes backup entries.
* @return Local entries iterator.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index c9a7af8..89b85c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -292,12 +292,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
return ret;
}
- /**
- * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
- */
+ /** {@inheritDoc} */
@Override public void onUnlock() {
- super.onUnlock();
-
locPart.onUnlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index f6f930e..742fbfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -295,6 +295,11 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
if (info == null)
continue;
+ boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
+
+ if (addReader)
+ e.unswap(false);
+
// Register reader. If there are active transactions for this entry,
// then will wait for their completion before proceeding.
// TODO: GG-4003:
@@ -303,8 +308,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
// TODO: To fix, check that reader is contained in the list of readers once
// TODO: again after the returned future completes - if not, try again.
// TODO: Also, why is info read before transactions are complete, and not after?
- IgniteInternalFuture<Boolean> f = (!e.deleted() && k.getValue() && !skipVals) ?
- e.addReader(reader, msgId, topVer) : null;
+ IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
if (f != null) {
if (txFut == null)
@@ -317,6 +321,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
break;
}
+ catch (IgniteCheckedException err) {
+ return new GridFinishedFuture<>(err);
+ }
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry when getting a DHT value: " + e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c57eded..bdaa552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
/**
* Cache lock future.
*/
-public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
/** */
private static final long serialVersionUID = 0L;
@@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Near node ID. */
private UUID nearNodeId;
@@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param skipStore Skip store flag.
*/
public GridDhtLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
UUID nearNodeId,
GridCacheVersion nearLockVer,
@NotNull AffinityTopologyVersion topVer,
@@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param cacheCtx Cache context.
* @param invalidPart Partition to retry.
*/
- void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) {
+ void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) {
invalidParts.add(invalidPart);
// Register invalid partitions with transaction.
@@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param entries Entries to check.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach"})
- private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
+ private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
@Nullable List<GridDhtCacheEntry> entries) {
if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty())
return;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 073e0e7..374ab87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
* Partition topology.
*/
@GridToStringExclude
-class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
+class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** If true, then check consistency. */
private static final boolean CONSISTENCY_CHECK = false;
@@ -49,7 +49,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
private static final boolean FULL_MAP_DEBUG = false;
/** Context. */
- private final GridCacheContext<K, V> cctx;
+ private final GridCacheContext<?, ?> cctx;
/** Logger. */
private final IgniteLogger log;
@@ -85,7 +85,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
/**
* @param cctx Context.
*/
- GridDhtPartitionTopologyImpl(GridCacheContext<K, V> cctx) {
+ GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx) {
assert cctx != null;
this.cctx = cctx;
@@ -239,7 +239,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
removeNode(exchId.nodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldest(cctx.shared(), topVer);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ assert oldest != null;
if (log.isDebugEnabled())
log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -247,7 +249,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) {
+ if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -274,7 +276,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
if (cctx.rebalanceEnabled()) {
for (int p = 0; p < num; p++) {
// If this is the first node in grid.
- boolean added = exchFut.isCacheAdded(cctx.cacheId());
+ boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) {
assert exchId.isJoined() || added;
@@ -604,7 +606,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
try {
return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(),
- F.viewReadOnly(locParts, CU.<K, V>part2state()), true);
+ F.viewReadOnly(locParts, CU.part2state()), true);
}
finally {
lock.readLock().unlock();
@@ -660,13 +662,15 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
* @return List of nodes for the partition.
*/
private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null;
+ Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
lock.readLock().lock();
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
- ", allIds=" + allIds + ", node2part=" + node2part + ']';
+ ", allIds=" + allIds +
+ ", node2part=" + node2part +
+ ", cache=" + cctx.name() + ']';
Collection<UUID> nodeIds = part2node.get(p);
@@ -738,7 +742,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
- ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']';
+ ", cache=" + cctx.name() +
+ ", started=" + cctx.started() +
+ ", stopping=" + stopping +
+ ", locNodeId=" + cctx.localNode().id() +
+ ", locName=" + cctx.gridName() + ']';
GridDhtPartitionFullMap m = node2part;
@@ -756,6 +764,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
+ assert partMap != null;
+
lock.writeLock().lock();
try {
@@ -1024,7 +1034,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
assert nodeId.equals(cctx.nodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldest(cctx, topVer);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ assert oldest != null;
// If this node became the oldest node.
if (oldest.id().equals(cctx.nodeId())) {
@@ -1074,7 +1086,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
assert nodeId != null;
assert lock.writeLock().isHeldByCurrentThread();
- ClusterNode oldest = CU.oldest(cctx, topVer);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ assert oldest != null;
ClusterNode loc = cctx.localNode();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 26eef50..703daf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -81,7 +81,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
@Override public void start() throws IgniteCheckedException {
super.start();
- preldr = new GridDhtPreloader<>(ctx);
+ preldr = new GridDhtPreloader(ctx);
preldr.start();
@@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
return;
}
- // Group lock can be only started from local node, so we never start group lock transaction on remote node.
IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
// Register listener just so we print out errors.
@@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
assert nodeId != null;
assert res != null;
- GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
- res.futureId());
+ GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
assert tx != null;
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
+ GridDhtLockFuture fut = new GridDhtLockFuture(
ctx,
tx.nearNodeId(),
tx.nearXidVersion(),
@@ -669,7 +667,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @return Future.
*/
public IgniteInternalFuture<GridNearLockResponse> lockAllAsync(
- final GridCacheContext<K, V> cacheCtx,
+ final GridCacheContext<?, ?> cacheCtx,
final ClusterNode nearNode,
final GridNearLockRequest req,
@Nullable final CacheEntryPredicate[] filter0) {
@@ -719,26 +717,57 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (filter == null)
filter = req.filter();
- GridDhtLockFuture<K, V> fut = null;
+ GridDhtLockFuture fut = null;
if (!req.inTx()) {
- fut = new GridDhtLockFuture<>(ctx,
- nearNode.id(),
- req.version(),
- req.topologyVersion(),
- cnt,
- req.txRead(),
- req.needReturnValue(),
- req.timeout(),
- tx,
- req.threadId(),
- req.accessTtl(),
- filter,
- req.skipStore());
+ GridDhtPartitionTopology top = null;
+
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
+
+ top = topology();
+
+ topology().readLock();
+ }
+
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
+
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
+
+ return new GridFinishedFuture<>(res);
+ }
+
+ fut = new GridDhtLockFuture(ctx,
+ nearNode.id(),
+ req.version(),
+ req.topologyVersion(),
+ cnt,
+ req.txRead(),
+ req.needReturnValue(),
+ req.timeout(),
+ tx,
+ req.threadId(),
+ req.accessTtl(),
+ filter,
+ req.skipStore());
- // Add before mapping.
- if (!ctx.mvcc().addFuture(fut))
- throw new IllegalStateException("Duplicate future ID: " + fut);
+ // Add before mapping.
+ if (!ctx.mvcc().addFuture(fut))
+ throw new IllegalStateException("Duplicate future ID: " + fut);
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
}
boolean timedout = false;
@@ -788,45 +817,76 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
// Handle implicit locks for pessimistic transactions.
if (req.inTx()) {
if (tx == null) {
- tx = new GridDhtTxLocal(
- ctx.shared(),
- nearNode.id(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- req.implicitTx(),
- req.implicitSingleTx(),
- ctx.systemTx(),
- false,
- ctx.ioPolicy(),
- PESSIMISTIC,
- req.isolation(),
- req.timeout(),
- req.isInvalidate(),
- false,
- req.txSize(),
- null,
- req.subjectId(),
- req.taskNameHash());
+ GridDhtPartitionTopology top = null;
- tx.syncCommit(req.syncCommit());
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
- tx = ctx.tm().onCreated(null, tx);
+ top = topology();
- if (tx == null || !tx.init()) {
- String msg = "Failed to acquire lock (transaction has been completed): " +
- req.version();
+ topology().readLock();
+ }
- U.warn(log, msg);
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
- if (tx != null)
- tx.rollback();
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
- return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
- }
+ return new GridFinishedFuture<>(res);
+ }
- tx.topologyVersion(req.topologyVersion());
+ tx = new GridDhtTxLocal(
+ ctx.shared(),
+ nearNode.id(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ req.implicitTx(),
+ req.implicitSingleTx(),
+ ctx.systemTx(),
+ false,
+ ctx.ioPolicy(),
+ PESSIMISTIC,
+ req.isolation(),
+ req.timeout(),
+ req.isInvalidate(),
+ false,
+ req.txSize(),
+ null,
+ req.subjectId(),
+ req.taskNameHash());
+
+ tx.syncCommit(req.syncCommit());
+
+ tx = ctx.tm().onCreated(null, tx);
+
+ if (tx == null || !tx.init()) {
+ String msg = "Failed to acquire lock (transaction has been completed): " +
+ req.version();
+
+ U.warn(log, msg);
+
+ if (tx != null)
+ tx.rollback();
+
+ return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+ }
+
+ tx.topologyVersion(req.topologyVersion());
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
}
ctx.tm().txContext(tx);
@@ -947,6 +1007,42 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
/**
+ * @param nearNode Client node.
+ * @param req Request.
+ * @param topVer Remap version.
+ * @return Response.
+ */
+ private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode,
+ GridNearLockRequest req,
+ AffinityTopologyVersion topVer) {
+ assert topVer != null;
+
+ GridNearLockResponse res = new GridNearLockResponse(
+ ctx.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false,
+ 0,
+ null,
+ topVer);
+
+ try {
+ ctx.io().send(nearNode, res, ctx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send client lock remap response, client node failed " +
+ "[node=" + nearNode + ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e);
+ }
+
+ return res;
+ }
+
+ /**
* @param nearNode Near node.
* @param entries Entries.
* @param req Lock request.
@@ -968,7 +1064,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
try {
// Send reply back to originating near node.
GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(),
- req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ tx != null && tx.onePhaseCommit(),
+ entries.size(),
+ err,
+ null);
if (err == null) {
res.pending(localDhtPendingVersions(entries, mappedVer));
@@ -1077,8 +1179,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
U.error(log, "Failed to get value for lock reply message for node [node=" +
U.toShortString(nearNode) + ", req=" + req + ']', e);
- return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false,
- entries.size(), e);
+ return new GridNearLockResponse(ctx.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false,
+ entries.size(),
+ e,
+ null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 54b59b8..90edb0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** Near mappings. */
- protected Map<UUID, GridDistributedTxMapping> nearMap =
- new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
/** DHT mappings. */
- protected Map<UUID, GridDistributedTxMapping> dhtMap =
- new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
/** Mapped flag. */
- private AtomicBoolean mapped = new AtomicBoolean();
+ protected AtomicBoolean mapped = new AtomicBoolean();
/** */
private long dhtThreadId;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 293cf95..af0fbdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.writeVersion(),
tx.invalidPartitions(),
ret,
- prepErr);
+ prepErr,
+ null);
if (prepErr == null) {
addDhtValues(res);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8bbfe96..8630421 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -171,7 +171,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
metrics = m;
- preldr = new GridDhtPreloader<>(ctx);
+ preldr = new GridDhtPreloader(ctx);
preldr.start();
@@ -737,6 +737,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable final CacheEntryPredicate[] filter,
final boolean waitTopFut
) {
+ assert ctx.updatesAllowed();
+
if (map != null && keyCheck)
validateCacheKeys(map.keySet());
@@ -793,6 +795,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean rawRetval,
@Nullable final CacheEntryPredicate[] filter
) {
+ assert ctx.updatesAllowed();
+
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -1024,9 +1028,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
IgniteCacheExpiryPolicy expiry = null;
try {
- // If batch store update is enabled, we need to lock all entries.
- // First, need to acquire locks on cache entries, then check filter.
- List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion());
+ List<GridDhtCacheEntry> locked = null;
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
try {
@@ -1043,11 +1045,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
// Do not check topology version for CLOCK versioning since
- // partition exchange will wait for near update future.
+ // partition exchange will wait for near update future (if future is on server node).
// Also do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() ||
- ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+ !needRemap(req.topologyVersion(), topology().topologyVersion())) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
@@ -1056,13 +1058,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
+ // If batch store update is enabled, we need to lock all entries.
+ // First, need to acquire locks on cache entries, then check filter.
+ locked = lockEntries(keys, req.topologyVersion());
+
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
GridCacheVersion ver = req.updateVersion();
if (ver == null) {
// Assign next version for update inside entries lock.
- ver = ctx.versions().next(req.topologyVersion());
+ ver = ctx.versions().next(topology().topologyVersion());
if (hasNear)
res.nearVersion(ver);
@@ -1105,7 +1111,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
retVal = updRes.invokeResults();
}
else {
- UpdateSingleResult<K, V> updRes = updateSingle(node,
+ UpdateSingleResult updRes = updateSingle(node,
hasNear,
req,
res,
@@ -1144,7 +1150,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
e.printStackTrace();
}
finally {
- unlockEntries(locked, req.topologyVersion());
+ if (locked != null)
+ unlockEntries(locked, req.topologyVersion());
// Enqueue if necessary after locks release.
if (deleted != null) {
@@ -1157,7 +1164,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
catch (GridDhtInvalidPartitionException ignore) {
- assert ctx.config().getAtomicWriteOrderMode() == PRIMARY;
+ assert !req.fastMap() || req.clientRequest() : req;
if (log.isDebugEnabled())
log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
@@ -1605,7 +1612,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @return Return value.
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
- private UpdateSingleResult<K, V> updateSingle(
+ private UpdateSingleResult updateSingle(
ClusterNode node,
boolean hasNear,
GridNearAtomicUpdateRequest req,
@@ -1799,7 +1806,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
- return new UpdateSingleResult<>(retVal, deleted, dhtFut);
+ return new UpdateSingleResult(retVal, deleted, dhtFut);
}
/**
@@ -2572,7 +2579,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* Result of {@link GridDhtAtomicCache#updateSingle} execution.
*/
- private static class UpdateSingleResult<K, V> {
+ private static class UpdateSingleResult {
/** */
private final GridCacheReturn retVal;
@@ -2772,14 +2779,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void onTimeout() {
if (guard.compareAndSet(false, true)) {
- writeLock().lock();
+ ctx.closures().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ writeLock().lock();
- try {
- finish();
- }
- finally {
- writeLock().unlock();
- }
+ try {
+ finish();
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 40ab104..ff8454e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Future keys. */
private Collection<KeyCacheObject> keys;
+ /** */
+ private boolean waitForExchange;
+
/**
* @param cctx Cache context.
* @param completionCb Callback to invoke when future is completed.
@@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
keys = new ArrayList<>(updateReq.keys().size());
+
+ boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
+
+ waitForExchange = !topLocked;
}
/** {@inheritDoc} */
@@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** {@inheritDoc} */
@Override public boolean waitForPartitionExchange() {
- // Wait dht update futures in PRIMARY mode.
- return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+ return waitForExchange;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 76e05e5..07f5ecf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -128,6 +128,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Fast map flag. */
private final boolean fastMap;
+ /** */
+ private boolean fastMapRemap;
+
+ /** */
+ private GridCacheVersion updVer;
+
/** Near cache flag. */
private final boolean nearEnabled;
@@ -304,11 +310,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
if (topVer == null)
- mapOnTopology(keys, false, null, waitTopFut);
+ mapOnTopology(null, false, null, waitTopFut);
else {
topLocked = true;
- map0(topVer, keys, false, null);
+ map0(topVer, null, false, null);
}
}
@@ -343,9 +349,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
*/
public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (res.remapKeys() != null) {
- assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
+ assert !fastMap || cctx.kernalContext().clientNode();
+
+ Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
- mapOnTopology(res.remapKeys(), true, nodeId, true);
+ mapOnTopology(remapKeys, true, nodeId, true);
return;
}
@@ -454,9 +462,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
else {
if (waitTopFut) {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override
- public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+ }
+ });
}
});
}
@@ -476,29 +487,43 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/**
* Checks if future is ready to be completed.
*/
- private synchronized void checkComplete() {
- if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
- CachePartialUpdateCheckedException err0 = err;
+ private void checkComplete() {
+ boolean remap = false;
- if (err0 != null)
- onDone(err0);
- else
- onDone(opRes);
+ synchronized (this) {
+ if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
+ CachePartialUpdateCheckedException err0 = err;
+
+ if (err0 != null)
+ onDone(err0);
+ else {
+ if (fastMapRemap) {
+ assert cctx.kernalContext().clientNode();
+
+ remap = true;
+ }
+ else
+ onDone(opRes);
+ }
+ }
}
+
+ if (remap)
+ mapOnTopology(null, true, null, true);
}
/**
* @param topVer Topology version.
- * @param keys Keys to map.
+ * @param remapKeys Keys to remap or {@code null} to map all keys.
* @param remap Flag indicating if this is partial remap for this future.
* @param oldNodeId Old node ID if was remap.
*/
private void map0(
AffinityTopologyVersion topVer,
- Collection<?> keys,
+ @Nullable Collection<?> remapKeys,
boolean remap,
@Nullable UUID oldNodeId) {
- assert oldNodeId == null || remap;
+ assert oldNodeId == null || remap || fastMapRemap;
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
@@ -519,12 +544,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
CacheConfiguration ccfg = cctx.config();
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
- GridCacheVersion updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+ if (updVer == null)
+ updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
if (updVer != null && log.isDebugEnabled())
log.debug("Assigned fast-map version for update on near node: " + updVer);
if (keys.size() == 1 && !fastMap && (single == null || single)) {
+ assert remapKeys == null || remapKeys.size() == 1 : remapKeys;
+
Object key = F.first(keys);
Object val;
@@ -610,7 +638,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
filter,
subjId,
taskNameHash,
- skipStore);
+ skipStore,
+ cctx.kernalContext().clientNode());
req.addUpdateEntry(cacheKey,
val,
@@ -647,9 +676,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// Must do this in synchronized block because we need to atomically remove and add mapping.
// Otherwise checkComplete() may see empty intermediate state.
synchronized (this) {
- if (remap)
+ if (oldNodeId != null)
removeMapping(oldNodeId);
+ // For fastMap mode wait for all responses before remapping.
+ if (remap && fastMap && !mappings.isEmpty()) {
+ fastMapRemap = true;
+
+ return;
+ }
+
// Create mappings first, then send messages.
for (Object key : keys) {
if (key == null) {
@@ -705,6 +741,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
+
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
@@ -748,7 +787,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
filter,
subjId,
taskNameHash,
- skipStore);
+ skipStore,
+ cctx.kernalContext().clientNode());
pendingMappings.put(nodeId, mapped);
@@ -763,6 +803,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
i++;
}
}
+
+ fastMapRemap = false;
}
if ((single == null || single) && pendingMappings.size() == 1) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index a96a666..86c5ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** Skip write-through to a persistent storage. */
private boolean skipStore;
+ /** */
+ private boolean clientReq;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param fastMap Fast map scheme flag.
* @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
+ * @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
* @param op Cache update operation.
* @param retval Return value required flag.
@@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
+ * @param clientReq Client node request flag.
*/
public GridNearAtomicUpdateRequest(
int cacheId,
@@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
- boolean skipStore
+ boolean skipStore,
+ boolean clientReq
) {
this.cacheId = cacheId;
this.nodeId = nodeId;
@@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
this.subjId = subjId;
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
+ this.clientReq = clientReq;
keys = new ArrayList<>();
}
@@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
/**
+ * @return {@code True} if request sent from client node.
+ */
+ public boolean clientRequest() {
+ return clientReq;
+ }
+
+ /**
* @return Cache write synchronization mode.
*/
public CacheWriteSynchronizationMode writeSynchronizationMode() {
@@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
switch (writer.state()) {
case 3:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+ if (!writer.writeBoolean("clientReq", clientReq))
return false;
writer.incrementState();
case 4:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
return false;
writer.incrementState();
case 5:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("conflictTtls", conflictTtls))
return false;
writer.incrementState();
case 6:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 7:
- if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 8:
- if (!writer.writeBoolean("fastMap", fastMap))
+ if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
return false;
writer.incrementState();
case 9:
- if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+ if (!writer.writeBoolean("fastMap", fastMap))
return false;
writer.incrementState();
case 10:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
+ if (!writer.writeMessage("futVer", futVer))
return false;
writer.incrementState();
case 12:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeBoolean("hasPrimary", hasPrimary))
return false;
writer.incrementState();
case 13:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 14:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 15:
- if (!writer.writeBoolean("retval", retval))
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
return false;
writer.incrementState();
case 16:
- if (!writer.writeBoolean("skipStore", skipStore))
+ if (!writer.writeBoolean("retval", retval))
return false;
writer.incrementState();
case 17:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("skipStore", skipStore))
return false;
writer.incrementState();
case 18:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 19:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 20:
- if (!writer.writeBoolean("topLocked", topLocked))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 21:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeBoolean("topLocked", topLocked))
return false;
writer.incrementState();
case 22:
- if (!writer.writeMessage("updateVer", updateVer))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 23:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
switch (reader.state()) {
case 3:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+ clientReq = reader.readBoolean("clientReq");
if (!reader.isLastRead())
return false;
@@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 4:
- conflictTtls = reader.readMessage("conflictTtls");
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
if (!reader.isLastRead())
return false;
@@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 5:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+ conflictTtls = reader.readMessage("conflictTtls");
if (!reader.isLastRead())
return false;
@@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 6:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 7:
- expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
return false;
@@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 8:
- fastMap = reader.readBoolean("fastMap");
+ expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
if (!reader.isLastRead())
return false;
@@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 9:
- filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+ fastMap = reader.readBoolean("fastMap");
if (!reader.isLastRead())
return false;
@@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 10:
- futVer = reader.readMessage("futVer");
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
return false;
@@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 11:
- hasPrimary = reader.readBoolean("hasPrimary");
+ futVer = reader.readMessage("futVer");
if (!reader.isLastRead())
return false;
@@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 12:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+ hasPrimary = reader.readBoolean("hasPrimary");
if (!reader.isLastRead())
return false;
@@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 13:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
return false;
@@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 14:
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
byte opOrd;
opOrd = reader.readByte("op");
@@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 15:
+ case 16:
retval = reader.readBoolean("retval");
if (!reader.isLastRead())
@@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 16:
+ case 17:
skipStore = reader.readBoolean("skipStore");
if (!reader.isLastRead())
@@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 17:
+ case 18:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 18:
+ case 19:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 19:
+ case 20:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 20:
+ case 21:
topLocked = reader.readBoolean("topLocked");
if (!reader.isLastRead())
@@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 21:
+ case 22:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 22:
+ case 23:
updateVer = reader.readMessage("updateVer");
if (!reader.isLastRead())
@@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 23:
+ case 24:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 24;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 05b3c7b..221b230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable TransactionIsolation isolation,
long accessTtl
) {
- assert tx == null || tx instanceof GridNearTxLocal;
+ assert tx == null || tx instanceof GridNearTxLocal : tx;
GridNearTxLocal txx = (GridNearTxLocal)tx;
CacheOperationContext opCtx = ctx.operationContextPerCall();
- GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+ GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx,
keys,
txx,
isRead,
@@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @return Lock future.
*/
IgniteInternalFuture<Exception> lockAllAsync(
- final GridCacheContext<K, V> cacheCtx,
+ final GridCacheContext<?, ?> cacheCtx,
@Nullable final GridNearTxLocal tx,
final long threadId,
final GridCacheVersion ver,
@@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @return Lock future.
*/
private IgniteInternalFuture<Exception> lockAllAsync0(
- GridCacheContext<K, V> cacheCtx,
+ GridCacheContext<?, ?> cacheCtx,
@Nullable final GridNearTxLocal tx,
long threadId,
final GridCacheVersion ver,
@@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
int cnt = keys.size();
if (tx == null) {
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+ GridDhtLockFuture fut = new GridDhtLockFuture(ctx,
ctx.localNodeId(),
ver,
topVer,
@@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
assert nodeId != null;
assert res != null;
- GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc().
+ GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
<Boolean>future(res.version(), res.futureId());
if (fut != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 372c517..c784948 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*;
/**
* Colocated cache lock future.
*/
-public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Lock owner thread. */
@GridToStringInclude
@@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* @param timeout Lock acquisition timeout.
* @param accessTtl TTL for read operation.
* @param filter Filter.
- * @param skipStore
+ * @param skipStore Skip store flag.
*/
public GridDhtColocatedLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
@@ -326,13 +326,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* Undoes all locks.
*
* @param dist If {@code true}, then remove locks from remote nodes as well.
+ * @param rollback {@code True} if should rollback tx.
*/
- private void undoLocks(boolean dist) {
+ private void undoLocks(boolean dist, boolean rollback) {
// Transactions will undo during rollback.
if (dist && tx == null)
cctx.colocated().removeLocks(threadId, lockVer, keys);
else {
- if (tx != null) {
+ if (rollback && tx != null) {
if (tx.setRollbackOnly()) {
if (log.isDebugEnabled())
log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -346,16 +347,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
}
/**
- *
- * @param dist {@code True} if need to distribute lock release.
- */
- private void onFailed(boolean dist) {
- undoLocks(dist);
-
- complete(false);
- }
-
- /**
* @param success Success flag.
*/
public void complete(boolean success) {
@@ -475,7 +466,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
", fut=" + this + ']');
if (!success)
- undoLocks(distribute);
+ undoLocks(distribute, true);
if (tx != null)
cctx.tm().txContext(tx);
@@ -550,7 +541,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
// Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer);
- map(keys);
+ map(keys, false);
markInitialized();
@@ -558,14 +549,17 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
}
// Must get topology snapshot and map on that version.
- mapOnTopology();
+ mapOnTopology(false, null);
}
/**
* Acquires topology future and checks it completeness under the read lock. If it is not complete,
* will asynchronously wait for it's completeness and then try again.
+ *
+ * @param remap Remap flag.
+ * @param c Optional closure to run after map.
*/
- private void mapOnTopology() {
+ private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {
// We must acquire topology snapshot from the topology version future.
cctx.topology().readLock();
@@ -589,19 +583,30 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
AffinityTopologyVersion topVer = fut.topologyVersion();
- if (tx != null)
- tx.topologyVersion(topVer);
+ if (remap) {
+ if (tx != null)
+ tx.onRemap(topVer);
+
+ this.topVer.set(topVer);
+ }
+ else {
+ if (tx != null)
+ tx.topologyVersion(topVer);
+
+ this.topVer.compareAndSet(null, topVer);
+ }
- this.topVer.compareAndSet(null, topVer);
+ map(keys, remap);
- map(keys);
+ if (c != null)
+ c.run();
markInitialized();
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- mapOnTopology();
+ mapOnTopology(remap, c);
}
});
}
@@ -617,8 +622,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* groups belonging to one primary node and locks for these groups are acquired sequentially.
*
* @param keys Keys.
+ * @param remap Remap flag.
*/
- private void map(Collection<KeyCacheObject> keys) {
+ private void map(Collection<KeyCacheObject> keys, boolean remap) {
try {
AffinityTopologyVersion topVer = this.topVer.get();
@@ -633,8 +639,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
return;
}
+ boolean clientNode = cctx.kernalContext().clientNode();
+
+ assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+
// First assume this node is primary for all keys passed in.
- if (mapAsPrimary(keys, topVer))
+ if (!clientNode && mapAsPrimary(keys, topVer))
return;
Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
@@ -668,6 +678,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
boolean hasRmtNodes = false;
+ boolean first = true;
+
// Create mini futures.
for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
GridNearLockMapping mapping = iter.next();
@@ -736,6 +748,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (cand != null && !cand.reentry()) {
if (req == null) {
+ boolean clientFirst = false;
+
+ if (first) {
+ clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+
+ first = false;
+ }
+
req = new GridNearLockRequest(
cctx.cacheId(),
topVer,
@@ -757,7 +777,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
inTx() ? tx.subjectId() : null,
inTx() ? tx.taskNameHash() : 0,
read ? accessTtl : -1L,
- skipStore);
+ skipStore,
+ clientFirst);
mapping.request(req);
}
@@ -815,7 +836,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (hasRmtNodes) {
trackable = true;
- if (!cctx.mvcc().addFuture(this))
+ if (!remap && !cctx.mvcc().addFuture(this))
throw new IllegalStateException("Duplicate future ID: " + this);
}
else
@@ -1249,75 +1270,111 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
return;
}
- int i = 0;
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
+
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ remap();
+ }
+ });
+ }
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- for (KeyCacheObject k : keys) {
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
+ for (KeyCacheObject k : keys) {
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
- CacheObject newVal = res.value(i);
+ CacheObject newVal = res.value(i);
- GridCacheVersion dhtVer = res.dhtVersion(i);
+ GridCacheVersion dhtVer = res.dhtVersion(i);
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
+ }
}
- }
- if (inTx()) {
- IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+ if (inTx()) {
+ IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+
+ // In colocated cache we must receive responses only for detached entries.
+ assert txEntry.cached().detached() : txEntry;
- // In colocated cache we must receive responses only for detached entries.
- assert txEntry.cached().detached();
+ txEntry.markLocked();
- txEntry.markLocked();
+ GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
- GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
+
+ return;
+ }
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ // Set value to detached entry.
+ entry.resetFromPrimary(newVal, dhtVer);
- return;
+ tx.hasRemoteLocks(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ }
+ else
+ cctx.mvcc().markExplicitOwner(k, threadId);
+
+ if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ cctx.events().addEvent(cctx.affinity().partition(k),
+ k,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ null,
+ false,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ tx == null ? null : tx.resolveTaskName());
}
- // Set value to detached entry.
- entry.resetFromPrimary(newVal, dhtVer);
+ i++;
+ }
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ try {
+ proceedMapping(mappings);
}
- else
- cctx.mvcc().markExplicitOwner(k, threadId);
-
- if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- cctx.events().addEvent(cctx.affinity().partition(k),
- k,
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- null,
- false,
- CU.subjectId(tx, cctx.shared()),
- null,
- tx == null ? null : tx.resolveTaskName());
+ catch (IgniteCheckedException e) {
+ onDone(e);
}
- i++;
+ onDone(true);
}
+ }
+ }
- try {
- proceedMapping(mappings);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ /**
+ *
+ */
+ private void remap() {
+ undoLocks(false, false);
- onDone(true);
- }
+ for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys)
+ cctx.mvcc().removeExplicitLock(threadId, key, lockVer);
+
+ mapOnTopology(true, new Runnable() {
+ @Override public void run() {
+ onDone(true);
+ }
+ });
}
/** {@inheritDoc} */