You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/21 16:40:41 UTC
[2/3] incubator-ignite git commit: # ignite-23 remap for tx updates
from client
# ignite-23 remap for tx updates from client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1d413965
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1d413965
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1d413965
Branch: refs/heads/ignite-23
Commit: 1d413965d06cd8188df39115701a69761f7ea998
Parents: 12761e4
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 21 11:54:43 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 21 17:39:30 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 8 +-
.../GridCachePartitionExchangeManager.java | 44 ++--
.../distributed/GridDistributedTxMapping.java | 17 ++
.../distributed/dht/GridDhtLockFuture.java | 10 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 222 ++++++++++++----
.../distributed/dht/GridDhtTxLocalAdapter.java | 8 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 3 +-
.../dht/colocated/GridDhtColocatedCache.java | 12 +-
.../colocated/GridDhtColocatedLockFuture.java | 184 ++++++++-----
.../GridDhtPartitionsExchangeFuture.java | 16 +-
.../distributed/near/GridNearLockFuture.java | 259 +++++++++++-------
.../distributed/near/GridNearLockMapping.java | 17 ++
.../distributed/near/GridNearLockRequest.java | 68 +++--
.../distributed/near/GridNearLockResponse.java | 48 +++-
.../near/GridNearOptimisticTxPrepareFuture.java | 77 ++++--
.../GridNearPessimisticTxPrepareFuture.java | 5 +-
.../near/GridNearTransactionalCache.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 43 ++-
.../near/GridNearTxPrepareRequest.java | 72 +++--
.../near/GridNearTxPrepareResponse.java | 70 +++--
.../cache/transactions/IgniteInternalTx.java | 5 +
.../cache/transactions/IgniteTxAdapter.java | 15 +-
.../cache/transactions/IgniteTxHandler.java | 118 +++++++--
...niteCacheClientNodeChangingTopologyTest.java | 263 ++++++++++++++++++-
24 files changed, 1182 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 02f16c0..eef9fde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -472,7 +472,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.miniId(),
false,
0,
- req.classError());
+ req.classError(),
+ null);
sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
}
@@ -488,7 +489,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.miniId(),
req.version(),
req.version(),
- null, null, null);
+ null,
+ null,
+ null,
+ null);
res.error(req.classError());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 25e18db..41a13ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -706,9 +706,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsExchangeFuture old = exchFuts.addx(
fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs));
- if (old != null)
+ if (old != null) {
fut = old;
+ if (reqs != null)
+ fut.cacheChangeRequests(reqs);
+ }
+
if (discoEvt != null)
fut.onEvent(exchId, discoEvt);
@@ -870,17 +874,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
else {
if (msg.client()) {
- IgniteInternalFuture<?> fut = affinityReadyFuture(msg.exchangeId().topologyVersion());
-
- if (fut != null) {
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- processSinglePartitionClientUpdate(node, msg);
- }
- });
- }
- else
- processSinglePartitionClientUpdate(node, msg);
+ final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+ null,
+ null);
+
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ // Finished future should reply only to sender client node.
+ exchFut.onReceive(node.id(), msg);
+ }
+ });
}
else
exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
@@ -892,23 +895,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node.
- * @param msg Message.
- */
- private void processSinglePartitionClientUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
- final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
- null,
- null);
-
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- // Finished future should reply only to sender client node.
- exchFut.onReceive(node.id(), msg);
- }
- });
- }
-
- /**
* @param node Node ID.
* @param msg Message.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index fded3c9..bd1dedf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable {
/** {@code True} if mapping is for near caches, {@code false} otherwise. */
private boolean near;
+ /** {@code True} if this is first mapping for optimistic tx on client node. */
+ private boolean clientFirst;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable {
}
/**
+ * @return {@code True} if this is first mapping for optimistic tx on client node.
+ */
+ public boolean clientFirst() {
+ return clientFirst;
+ }
+
+ /**
+ * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node.
+ */
+ public void clientFirst(boolean clientFirst) {
+ this.clientFirst = clientFirst;
+ }
+
+ /**
* @return {@code True} if mapping is for near caches, {@code false} otherwise.
*/
public boolean near() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c57eded..bdaa552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
/**
* Cache lock future.
*/
-public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
/** */
private static final long serialVersionUID = 0L;
@@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Near node ID. */
private UUID nearNodeId;
@@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param skipStore Skip store flag.
*/
public GridDhtLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
UUID nearNodeId,
GridCacheVersion nearLockVer,
@NotNull AffinityTopologyVersion topVer,
@@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param cacheCtx Cache context.
* @param invalidPart Partition to retry.
*/
- void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) {
+ void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) {
invalidParts.add(invalidPart);
// Register invalid partitions with transaction.
@@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param entries Entries to check.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach"})
- private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
+ private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
@Nullable List<GridDhtCacheEntry> entries) {
if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty())
return;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 26eef50..60e891c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
return;
}
- // Group lock can be only started from local node, so we never start group lock transaction on remote node.
IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
// Register listener just so we print out errors.
@@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
assert nodeId != null;
assert res != null;
- GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
- res.futureId());
+ GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
assert tx != null;
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
+ GridDhtLockFuture fut = new GridDhtLockFuture(
ctx,
tx.nearNodeId(),
tx.nearXidVersion(),
@@ -669,7 +667,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @return Future.
*/
public IgniteInternalFuture<GridNearLockResponse> lockAllAsync(
- final GridCacheContext<K, V> cacheCtx,
+ final GridCacheContext<?, ?> cacheCtx,
final ClusterNode nearNode,
final GridNearLockRequest req,
@Nullable final CacheEntryPredicate[] filter0) {
@@ -719,26 +717,57 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (filter == null)
filter = req.filter();
- GridDhtLockFuture<K, V> fut = null;
+ GridDhtLockFuture fut = null;
if (!req.inTx()) {
- fut = new GridDhtLockFuture<>(ctx,
- nearNode.id(),
- req.version(),
- req.topologyVersion(),
- cnt,
- req.txRead(),
- req.needReturnValue(),
- req.timeout(),
- tx,
- req.threadId(),
- req.accessTtl(),
- filter,
- req.skipStore());
+ GridDhtPartitionTopology top = null;
+
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
+
+ top = topology();
+
+ topology().readLock();
+ }
+
+ try {
+ if (top != null && !top.topologyVersion().equals(req.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
+
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
+
+ return new GridFinishedFuture<>(res);
+ }
+
+ fut = new GridDhtLockFuture(ctx,
+ nearNode.id(),
+ req.version(),
+ req.topologyVersion(),
+ cnt,
+ req.txRead(),
+ req.needReturnValue(),
+ req.timeout(),
+ tx,
+ req.threadId(),
+ req.accessTtl(),
+ filter,
+ req.skipStore());
- // Add before mapping.
- if (!ctx.mvcc().addFuture(fut))
- throw new IllegalStateException("Duplicate future ID: " + fut);
+ // Add before mapping.
+ if (!ctx.mvcc().addFuture(fut))
+ throw new IllegalStateException("Duplicate future ID: " + fut);
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
}
boolean timedout = false;
@@ -788,45 +817,76 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
// Handle implicit locks for pessimistic transactions.
if (req.inTx()) {
if (tx == null) {
- tx = new GridDhtTxLocal(
- ctx.shared(),
- nearNode.id(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- req.implicitTx(),
- req.implicitSingleTx(),
- ctx.systemTx(),
- false,
- ctx.ioPolicy(),
- PESSIMISTIC,
- req.isolation(),
- req.timeout(),
- req.isInvalidate(),
- false,
- req.txSize(),
- null,
- req.subjectId(),
- req.taskNameHash());
+ GridDhtPartitionTopology top = null;
- tx.syncCommit(req.syncCommit());
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
- tx = ctx.tm().onCreated(null, tx);
+ top = topology();
- if (tx == null || !tx.init()) {
- String msg = "Failed to acquire lock (transaction has been completed): " +
- req.version();
+ topology().readLock();
+ }
- U.warn(log, msg);
+ try {
+ if (top != null && !top.topologyVersion().equals(req.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
- if (tx != null)
- tx.rollback();
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
- return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
- }
+ return new GridFinishedFuture<>(res);
+ }
- tx.topologyVersion(req.topologyVersion());
+ tx = new GridDhtTxLocal(
+ ctx.shared(),
+ nearNode.id(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ req.implicitTx(),
+ req.implicitSingleTx(),
+ ctx.systemTx(),
+ false,
+ ctx.ioPolicy(),
+ PESSIMISTIC,
+ req.isolation(),
+ req.timeout(),
+ req.isInvalidate(),
+ false,
+ req.txSize(),
+ null,
+ req.subjectId(),
+ req.taskNameHash());
+
+ tx.syncCommit(req.syncCommit());
+
+ tx = ctx.tm().onCreated(null, tx);
+
+ if (tx == null || !tx.init()) {
+ String msg = "Failed to acquire lock (transaction has been completed): " +
+ req.version();
+
+ U.warn(log, msg);
+
+ if (tx != null)
+ tx.rollback();
+
+ return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+ }
+
+ tx.topologyVersion(req.topologyVersion());
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
}
ctx.tm().txContext(tx);
@@ -947,6 +1007,42 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
/**
+ * @param nearNode Client node.
+ * @param req Request.
+ * @param topVer Remap version.
+ * @return Response.
+ */
+ private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode,
+ GridNearLockRequest req,
+ AffinityTopologyVersion topVer) {
+ assert topVer != null;
+
+ GridNearLockResponse res = new GridNearLockResponse(
+ ctx.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false,
+ 0,
+ null,
+ topVer);
+
+ try {
+ ctx.io().send(nearNode, res, ctx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send client lock remap response, client node failed " +
+ "[node=" + nearNode + ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e);
+ }
+
+ return res;
+ }
+
+ /**
* @param nearNode Near node.
* @param entries Entries.
* @param req Lock request.
@@ -968,7 +1064,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
try {
// Send reply back to originating near node.
GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(),
- req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ tx != null && tx.onePhaseCommit(),
+ entries.size(),
+ err,
+ null);
if (err == null) {
res.pending(localDhtPendingVersions(entries, mappedVer));
@@ -1077,8 +1179,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
U.error(log, "Failed to get value for lock reply message for node [node=" +
U.toShortString(nearNode) + ", req=" + req + ']', e);
- return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false,
- entries.size(), e);
+ return new GridNearLockResponse(ctx.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false,
+ entries.size(),
+ e,
+ null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 54b59b8..90edb0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** Near mappings. */
- protected Map<UUID, GridDistributedTxMapping> nearMap =
- new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
/** DHT mappings. */
- protected Map<UUID, GridDistributedTxMapping> dhtMap =
- new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
/** Mapped flag. */
- private AtomicBoolean mapped = new AtomicBoolean();
+ protected AtomicBoolean mapped = new AtomicBoolean();
/** */
private long dhtThreadId;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 293cf95..af0fbdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.writeVersion(),
tx.invalidPartitions(),
ret,
- prepErr);
+ prepErr,
+ null);
if (prepErr == null) {
addDhtValues(res);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 05b3c7b..221b230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable TransactionIsolation isolation,
long accessTtl
) {
- assert tx == null || tx instanceof GridNearTxLocal;
+ assert tx == null || tx instanceof GridNearTxLocal : tx;
GridNearTxLocal txx = (GridNearTxLocal)tx;
CacheOperationContext opCtx = ctx.operationContextPerCall();
- GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+ GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx,
keys,
txx,
isRead,
@@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @return Lock future.
*/
IgniteInternalFuture<Exception> lockAllAsync(
- final GridCacheContext<K, V> cacheCtx,
+ final GridCacheContext<?, ?> cacheCtx,
@Nullable final GridNearTxLocal tx,
final long threadId,
final GridCacheVersion ver,
@@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @return Lock future.
*/
private IgniteInternalFuture<Exception> lockAllAsync0(
- GridCacheContext<K, V> cacheCtx,
+ GridCacheContext<?, ?> cacheCtx,
@Nullable final GridNearTxLocal tx,
long threadId,
final GridCacheVersion ver,
@@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
int cnt = keys.size();
if (tx == null) {
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+ GridDhtLockFuture fut = new GridDhtLockFuture(ctx,
ctx.localNodeId(),
ver,
topVer,
@@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
assert nodeId != null;
assert res != null;
- GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc().
+ GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
<Boolean>future(res.version(), res.futureId());
if (fut != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 372c517..a90c6e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*;
/**
* Colocated cache lock future.
*/
-public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Lock owner thread. */
@GridToStringInclude
@@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* @param timeout Lock acquisition timeout.
* @param accessTtl TTL for read operation.
* @param filter Filter.
- * @param skipStore
+ * @param skipStore Skip store flag.
*/
public GridDhtColocatedLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
@@ -550,7 +550,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
// Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer);
- map(keys);
+ map(keys, false);
markInitialized();
@@ -558,14 +558,16 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
}
// Must get topology snapshot and map on that version.
- mapOnTopology();
+ mapOnTopology(false);
}
/**
* Acquires topology future and checks it completeness under the read lock. If it is not complete,
* will asynchronously wait for it's completeness and then try again.
+ *
+ * @param remap Remap flag.
*/
- private void mapOnTopology() {
+ private void mapOnTopology(final boolean remap) {
// We must acquire topology snapshot from the topology version future.
cctx.topology().readLock();
@@ -589,19 +591,27 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
AffinityTopologyVersion topVer = fut.topologyVersion();
- if (tx != null)
- tx.topologyVersion(topVer);
+ if (remap) {
+ if (tx != null)
+ tx.onRemap(topVer);
+
+ this.topVer.set(topVer);
+ }
+ else {
+ if (tx != null)
+ tx.topologyVersion(topVer);
- this.topVer.compareAndSet(null, topVer);
+ this.topVer.compareAndSet(null, topVer);
+ }
- map(keys);
+ map(keys, remap);
markInitialized();
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- mapOnTopology();
+ mapOnTopology(remap);
}
});
}
@@ -617,8 +627,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* groups belonging to one primary node and locks for these groups are acquired sequentially.
*
* @param keys Keys.
+ * @param remap Remap flag.
*/
- private void map(Collection<KeyCacheObject> keys) {
+ private void map(Collection<KeyCacheObject> keys, boolean remap) {
try {
AffinityTopologyVersion topVer = this.topVer.get();
@@ -633,8 +644,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
return;
}
+ boolean clientNode = cctx.kernalContext().clientNode();
+
+ assert !remap || (clientNode && !tx.hasRemoteLocks());
+
// First assume this node is primary for all keys passed in.
- if (mapAsPrimary(keys, topVer))
+ if (!clientNode && mapAsPrimary(keys, topVer))
return;
Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
@@ -642,9 +657,18 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
// Assign keys to primary nodes.
GridNearLockMapping map = null;
+ boolean first = true;
+
for (KeyCacheObject key : keys) {
GridNearLockMapping updated = map(key, map, topVer);
+ if (first) {
+ if (clientNode)
+ updated.clientFirst(tx == null || !tx.hasRemoteLocks());
+
+ first = false;
+ }
+
// If new mapping was created, add to collection.
if (updated != map) {
mappings.add(updated);
@@ -757,7 +781,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
inTx() ? tx.subjectId() : null,
inTx() ? tx.taskNameHash() : 0,
read ? accessTtl : -1L,
- skipStore);
+ skipStore,
+ mapping.clientFirst());
mapping.request(req);
}
@@ -815,7 +840,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (hasRmtNodes) {
trackable = true;
- if (!cctx.mvcc().addFuture(this))
+ if (!remap && !cctx.mvcc().addFuture(this))
throw new IllegalStateException("Duplicate future ID: " + this);
}
else
@@ -1249,75 +1274,104 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
return;
}
- int i = 0;
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
+
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ remap();
+ }
+ });
+ }
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- for (KeyCacheObject k : keys) {
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
+ for (KeyCacheObject k : keys) {
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
- CacheObject newVal = res.value(i);
+ CacheObject newVal = res.value(i);
- GridCacheVersion dhtVer = res.dhtVersion(i);
+ GridCacheVersion dhtVer = res.dhtVersion(i);
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
+ }
}
- }
- if (inTx()) {
- IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+ if (inTx()) {
+ IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
- // In colocated cache we must receive responses only for detached entries.
- assert txEntry.cached().detached();
+ // In colocated cache we must receive responses only for detached entries.
+ assert txEntry.cached().detached() : txEntry;
- txEntry.markLocked();
+ txEntry.markLocked();
- GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
+ GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
- return;
+ return;
+ }
+
+ // Set value to detached entry.
+ entry.resetFromPrimary(newVal, dhtVer);
+
+ tx.hasRemoteLocks(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ }
+ else
+ cctx.mvcc().markExplicitOwner(k, threadId);
+
+ if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ cctx.events().addEvent(cctx.affinity().partition(k),
+ k,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ null,
+ false,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ tx == null ? null : tx.resolveTaskName());
}
- // Set value to detached entry.
- entry.resetFromPrimary(newVal, dhtVer);
+ i++;
+ }
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ try {
+ proceedMapping(mappings);
}
- else
- cctx.mvcc().markExplicitOwner(k, threadId);
-
- if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- cctx.events().addEvent(cctx.affinity().partition(k),
- k,
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- null,
- false,
- CU.subjectId(tx, cctx.shared()),
- null,
- tx == null ? null : tx.resolveTaskName());
+ catch (IgniteCheckedException e) {
+ onDone(e);
}
- i++;
+ onDone(true);
}
+ }
+ }
- try {
- proceedMapping(mappings);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ /**
+ *
+ */
+ private void remap() {
+ mapOnTopology(true);
- onDone(true);
- }
+ onDone(true);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 94ca540..af7fa5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -229,15 +229,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
initFut = new GridFutureAdapter<>();
- // Grab all nodes with order of equal or less than last joined node.
- ClusterNode node = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
-
- oldestNode.set(node);
-
if (log.isDebugEnabled())
log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
}
+ /**
+ * @param reqs Cache change requests.
+ */
+ public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
+ this.reqs = reqs;
+ }
+
/** {@inheritDoc} */
@Override public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException {
get();
@@ -461,6 +463,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert discoEvt != null : this;
assert !dummy && !forcePreload : this;
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+ oldestNode.set(oldest);
+
startCaches();
// True if client node joined or failed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 0ffb4e5..92498f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*;
/**
* Cache lock future.
*/
-public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -58,7 +58,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Lock owner thread. */
@GridToStringInclude
@@ -135,7 +135,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* @param skipStore skipStore
*/
public GridNearLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
@@ -184,15 +184,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* @return Participating nodes.
*/
@Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
+ return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+ @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+ if (isMini(f))
+ return ((MiniFuture)f).node();
- return cctx.discovery().localNode();
- }
- });
+ return cctx.discovery().localNode();
+ }
+ });
}
/** {@inheritDoc} */
@@ -682,7 +681,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// Continue mapping on the same topology version as it was before.
this.topVer.compareAndSet(null, topVer);
- map(keys);
+ map(keys, false);
markInitialized();
@@ -690,14 +689,16 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
}
// Must get topology snapshot and map on that version.
- mapOnTopology();
+ mapOnTopology(false);
}
/**
* Acquires topology future and checks it completeness under the read lock. If it is not complete,
* will asynchronously wait for it's completeness and then try again.
+ *
+ * @param remap Remap flag.
*/
- void mapOnTopology() {
+ void mapOnTopology(final boolean remap) {
// We must acquire topology snapshot from the topology version future.
cctx.topology().readLock();
@@ -721,19 +722,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
AffinityTopologyVersion topVer = fut.topologyVersion();
- if (tx != null)
- tx.topologyVersion(topVer);
+ if (remap) {
+ if (tx != null)
+ tx.onRemap(topVer);
- this.topVer.compareAndSet(null, topVer);
+ this.topVer.set(topVer);
+ }
+ else {
+ if (tx != null)
+ tx.topologyVersion(topVer);
+
+ this.topVer.compareAndSet(null, topVer);
+ }
- map(keys);
+ map(keys, remap);
markInitialized();
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- mapOnTopology();
+ mapOnTopology(remap);
}
});
}
@@ -749,14 +758,15 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* groups belonging to one primary node and locks for these groups are acquired sequentially.
*
* @param keys Keys.
+ * @param remap Remap flag.
*/
- private void map(Iterable<KeyCacheObject> keys) {
+ private void map(Iterable<KeyCacheObject> keys, boolean remap) {
try {
AffinityTopologyVersion topVer = this.topVer.get();
assert topVer != null;
- assert topVer.topologyVersion() > 0;
+ assert topVer.topologyVersion() > 0 : topVer;
if (CU.affinityNodes(cctx, topVer).isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all " +
@@ -765,15 +775,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
return;
}
- ConcurrentLinkedDeque8<GridNearLockMapping> mappings =
- new ConcurrentLinkedDeque8<>();
+ boolean clientNode = cctx.kernalContext().clientNode();
+
+ assert !remap || (clientNode && !tx.hasRemoteLocks());
+
+ ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
// Assign keys to primary nodes.
GridNearLockMapping map = null;
+ boolean first = true;
+
for (KeyCacheObject key : keys) {
GridNearLockMapping updated = map(key, map, topVer);
+ if (first) {
+ if (clientNode)
+ updated.clientFirst(tx == null || !tx.hasRemoteLocks());
+
+ first = false;
+ }
+
// If new mapping was created, add to collection.
if (updated != map) {
mappings.add(updated);
@@ -893,7 +915,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
inTx() ? tx.subjectId() : null,
inTx() ? tx.taskNameHash() : 0,
read ? accessTtl : -1L,
- skipStore);
+ skipStore,
+ mapping.clientFirst());
mapping.request(req);
}
@@ -1197,7 +1220,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
/**
* @return DHT cache.
*/
- private GridDhtTransactionalCacheAdapter<K, V> dht() {
+ private GridDhtTransactionalCacheAdapter<?, ?> dht() {
return cctx.nearTx().dht();
}
@@ -1356,110 +1379,144 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
return;
}
- int i = 0;
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
- AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
- for (KeyCacheObject k : keys) {
- while (true) {
- GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ remap();
+ }
+ });
+ }
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- try {
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
- return;
- }
+ for (KeyCacheObject k : keys) {
+ while (true) {
+ GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+ try {
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
- CacheObject oldVal = entry.rawGet();
- boolean hasOldVal = false;
- CacheObject newVal = res.value(i);
+ return;
+ }
- boolean readRecordable = false;
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
- if (retval) {
- readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+ CacheObject oldVal = entry.rawGet();
+ boolean hasOldVal = false;
+ CacheObject newVal = res.value(i);
- if (readRecordable)
- hasOldVal = entry.hasValue();
- }
+ boolean readRecordable = false;
- GridCacheVersion dhtVer = res.dhtVersion(i);
- GridCacheVersion mappedVer = res.mappedVersion(i);
+ if (retval) {
+ readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+
+ if (readRecordable)
+ hasOldVal = entry.hasValue();
+ }
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ GridCacheVersion dhtVer = res.dhtVersion(i);
+ GridCacheVersion mappedVer = res.mappedVersion(i);
- oldVal = oldValTup.get2();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
+
+ oldVal = oldValTup.get2();
+ }
}
- }
- // Lock is held at this point, so we can set the
- // returned value if any.
- entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
+ // Lock is held at this point, so we can set the
+ // returned value if any.
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
- if (inTx() && implicitTx() && tx.onePhaseCommit()) {
- boolean pass = res.filterResult(i);
+ if (inTx()) {
+ tx.hasRemoteLocks(true);
- tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
- }
+ if (implicitTx() && tx.onePhaseCommit()) {
+ boolean pass = res.filterResult(i);
- entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(),
- res.pending());
-
- if (retval) {
- if (readRecordable)
- cctx.events().addEvent(
- entry.partition(),
- entry.key(),
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- oldVal,
- hasOldVal,
- CU.subjectId(tx, cctx.shared()),
- null,
- inTx() ? tx.resolveTaskName() : null);
-
- if (cctx.cache().configuration().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(false);
- }
+ tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
+ }
+ }
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ entry.readyNearLock(lockVer,
+ mappedVer,
+ res.committedVersions(),
+ res.rolledbackVersions(),
+ res.pending());
+
+ if (retval) {
+ if (readRecordable)
+ cctx.events().addEvent(
+ entry.partition(),
+ entry.key(),
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ oldVal,
+ hasOldVal,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ inTx() ? tx.resolveTaskName() : null);
+
+ if (cctx.cache().configuration().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(false);
+ }
- break; // Inner while loop.
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to add candidates because entry was removed (will renew).");
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
- // Replace old entry with new one.
- entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ break; // Inner while loop.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to add candidates because entry was removed (will renew).");
+
+ // Replace old entry with new one.
+ entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ }
}
+
+ i++;
}
- i++;
- }
+ try {
+ proceedMapping(mappings);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
- try {
- proceedMapping(mappings);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ onDone(true);
}
-
- onDone(true);
}
}
+ /**
+ *
+ */
+ private void remap() {
+ mapOnTopology(true);
+
+ onDone(true);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
index 51000ef..3ea5b7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
@@ -44,6 +44,9 @@ public class GridNearLockMapping {
@GridToStringInclude
private Collection<KeyCacheObject> distributedKeys;
+ /** {@code True} if this is first mapping for lock operation on client node. */
+ private boolean clientFirst;
+
/**
* Creates near lock mapping for specified node and key.
*
@@ -60,6 +63,20 @@ public class GridNearLockMapping {
}
/**
+ * @return {@code True} if this is first mapping for lock operation on client node.
+ */
+ public boolean clientFirst() {
+ return clientFirst;
+ }
+
+ /**
+ * @param clientFirst {@code True} if this is first mapping for lock operation on client node.
+ */
+ public void clientFirst(boolean clientFirst) {
+ this.clientFirst = clientFirst;
+ }
+
+ /**
* @return Node to which keys are mapped.
*/
public ClusterNode node() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index e71dd65..81184a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -80,6 +80,9 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/** Flag indicating whether cache operation requires a previous value. */
private boolean retVal;
+ /** {@code True} if first lock request for lock operation sent from client node. */
+ private boolean firstClientReq;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -98,6 +101,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
* @param implicitTx Flag to indicate that transaction is implicit.
* @param implicitSingleTx Implicit-transaction-with-one-key flag.
* @param isRead Indicates whether implicit lock is for read or write operation.
+ * @param retVal Return value flag.
* @param isolation Transaction isolation.
* @param isInvalidate Invalidation flag.
* @param timeout Lock timeout.
@@ -108,6 +112,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
* @param taskNameHash Task name hash code.
* @param accessTtl TTL for read operation.
* @param skipStore Skip store flag.
+ * @param firstClientReq {@code True} if first lock request for lock operation sent from client node.
*/
public GridNearLockRequest(
int cacheId,
@@ -130,7 +135,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
@Nullable UUID subjId,
int taskNameHash,
long accessTtl,
- boolean skipStore
+ boolean skipStore,
+ boolean firstClientReq
) {
super(
cacheId,
@@ -158,11 +164,19 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
this.taskNameHash = taskNameHash;
this.accessTtl = accessTtl;
this.retVal = retVal;
+ this.firstClientReq = firstClientReq;
dhtVers = new GridCacheVersion[keyCnt];
}
/**
+ * @return {@code True} if first lock request for lock operation sent from client node.
+ */
+ public boolean firstClientRequest() {
+ return firstClientReq;
+ }
+
+ /**
* @return Topology version.
*/
@Override public AffinityTopologyVersion topologyVersion() {
@@ -368,60 +382,66 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
writer.incrementState();
case 24:
- if (!writer.writeBoolean("hasTransforms", hasTransforms))
+ if (!writer.writeBoolean("firstClientReq", firstClientReq))
return false;
writer.incrementState();
case 25:
- if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
+ if (!writer.writeBoolean("hasTransforms", hasTransforms))
return false;
writer.incrementState();
case 26:
- if (!writer.writeBoolean("implicitTx", implicitTx))
+ if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
return false;
writer.incrementState();
case 27:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeBoolean("implicitTx", implicitTx))
return false;
writer.incrementState();
case 28:
- if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 29:
- if (!writer.writeBoolean("retVal", retVal))
+ if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
return false;
writer.incrementState();
case 30:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("retVal", retVal))
return false;
writer.incrementState();
case 31:
- if (!writer.writeBoolean("syncCommit", syncCommit))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 32:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeBoolean("syncCommit", syncCommit))
return false;
writer.incrementState();
case 33:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 34:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -468,7 +488,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 24:
- hasTransforms = reader.readBoolean("hasTransforms");
+ firstClientReq = reader.readBoolean("firstClientReq");
if (!reader.isLastRead())
return false;
@@ -476,7 +496,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 25:
- implicitSingleTx = reader.readBoolean("implicitSingleTx");
+ hasTransforms = reader.readBoolean("hasTransforms");
if (!reader.isLastRead())
return false;
@@ -484,7 +504,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 26:
- implicitTx = reader.readBoolean("implicitTx");
+ implicitSingleTx = reader.readBoolean("implicitSingleTx");
if (!reader.isLastRead())
return false;
@@ -492,7 +512,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 27:
- miniId = reader.readIgniteUuid("miniId");
+ implicitTx = reader.readBoolean("implicitTx");
if (!reader.isLastRead())
return false;
@@ -500,7 +520,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 28:
- onePhaseCommit = reader.readBoolean("onePhaseCommit");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -508,7 +528,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 29:
- retVal = reader.readBoolean("retVal");
+ onePhaseCommit = reader.readBoolean("onePhaseCommit");
if (!reader.isLastRead())
return false;
@@ -516,7 +536,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 30:
- subjId = reader.readUuid("subjId");
+ retVal = reader.readBoolean("retVal");
if (!reader.isLastRead())
return false;
@@ -524,7 +544,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 31:
- syncCommit = reader.readBoolean("syncCommit");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -532,7 +552,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 32:
- taskNameHash = reader.readInt("taskNameHash");
+ syncCommit = reader.readBoolean("syncCommit");
if (!reader.isLastRead())
return false;
@@ -540,6 +560,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 33:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 34:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -559,7 +587,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 34;
+ return 35;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d413965/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index 20928de..f324198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.version.*;
@@ -58,6 +59,9 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
/** Filter evaluation results for fast-commit transactions. */
private boolean[] filterRes;
+ /** {@code True} if client node should remap lock request. */
+ private AffinityTopologyVersion clientRemapVer;
+
/**
* Empty constructor (required by {@link Externalizable}).
*/
@@ -73,6 +77,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
* @param filterRes {@code True} if need to allocate array for filter evaluation results.
* @param cnt Count.
* @param err Error.
+ * @param clientRemapVer {@code True} if client node should remap lock request.
*/
public GridNearLockResponse(
int cacheId,
@@ -81,13 +86,15 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
IgniteUuid miniId,
boolean filterRes,
int cnt,
- Throwable err
+ Throwable err,
+ AffinityTopologyVersion clientRemapVer
) {
super(cacheId, lockVer, futId, cnt, err);
assert miniId != null;
this.miniId = miniId;
+ this.clientRemapVer = clientRemapVer;
dhtVers = new GridCacheVersion[cnt];
mappedVers = new GridCacheVersion[cnt];
@@ -97,6 +104,13 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
}
/**
+ * @return {@code True} if client node should remap lock request.
+ */
+ @Nullable public AffinityTopologyVersion clientRemapVersion() {
+ return clientRemapVer;
+ }
+
+ /**
* Gets pending versions that are less than {@link #version()}.
*
* @return Pending versions.
@@ -192,30 +206,36 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
switch (writer.state()) {
case 11:
- if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("clientRemapVer", clientRemapVer))
return false;
writer.incrementState();
case 12:
- if (!writer.writeBooleanArray("filterRes", filterRes))
+ if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 13:
- if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG))
+ if (!writer.writeBooleanArray("filterRes", filterRes))
return false;
writer.incrementState();
case 14:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 15:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 16:
if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
return false;
@@ -238,7 +258,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
switch (reader.state()) {
case 11:
- dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
+ clientRemapVer = reader.readMessage("clientRemapVer");
if (!reader.isLastRead())
return false;
@@ -246,7 +266,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
reader.incrementState();
case 12:
- filterRes = reader.readBooleanArray("filterRes");
+ dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
if (!reader.isLastRead())
return false;
@@ -254,7 +274,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
reader.incrementState();
case 13:
- mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
+ filterRes = reader.readBooleanArray("filterRes");
if (!reader.isLastRead())
return false;
@@ -262,7 +282,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
reader.incrementState();
case 14:
- miniId = reader.readIgniteUuid("miniId");
+ mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
if (!reader.isLastRead())
return false;
@@ -270,6 +290,14 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
reader.incrementState();
case 15:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 16:
pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -289,7 +317,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 16;
+ return 17;
}
/** {@inheritDoc} */