You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/21 13:30:13 UTC
incubator-ignite git commit: # ignite-23 remap for tx updates from
client
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-23 12761e440 -> d12dd4173
# ignite-23 remap for tx updates from client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d12dd417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d12dd417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d12dd417
Branch: refs/heads/ignite-23
Commit: d12dd41736ead57d0f68aa211edde0b4922733fc
Parents: 12761e4
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 21 11:54:43 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 21 14:29:58 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 5 +-
.../GridCachePartitionExchangeManager.java | 44 ++--
.../distributed/GridDistributedTxMapping.java | 17 ++
.../distributed/dht/GridDhtLockFuture.java | 10 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 10 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 8 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 3 +-
.../dht/colocated/GridDhtColocatedCache.java | 12 +-
.../colocated/GridDhtColocatedLockFuture.java | 8 +-
.../GridDhtPartitionsExchangeFuture.java | 16 +-
.../distributed/near/GridNearLockRequest.java | 1 +
.../near/GridNearOptimisticTxPrepareFuture.java | 54 +++--
.../GridNearPessimisticTxPrepareFuture.java | 5 +-
.../cache/distributed/near/GridNearTxLocal.java | 16 ++
.../near/GridNearTxPrepareRequest.java | 72 +++++--
.../near/GridNearTxPrepareResponse.java | 68 +++++--
.../cache/transactions/IgniteInternalTx.java | 5 +
.../cache/transactions/IgniteTxAdapter.java | 15 +-
.../cache/transactions/IgniteTxHandler.java | 117 ++++++++---
...niteCacheClientNodeChangingTopologyTest.java | 200 ++++++++++++++++++-
20 files changed, 532 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 02f16c0..b8a4c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -488,7 +488,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.miniId(),
req.version(),
req.version(),
- null, null, null);
+ null,
+ null,
+ null,
+ false);
res.error(req.classError());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 25e18db..41a13ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -706,9 +706,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsExchangeFuture old = exchFuts.addx(
fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs));
- if (old != null)
+ if (old != null) {
fut = old;
+ if (reqs != null)
+ fut.cacheChangeRequests(reqs);
+ }
+
if (discoEvt != null)
fut.onEvent(exchId, discoEvt);
@@ -870,17 +874,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
else {
if (msg.client()) {
- IgniteInternalFuture<?> fut = affinityReadyFuture(msg.exchangeId().topologyVersion());
-
- if (fut != null) {
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- processSinglePartitionClientUpdate(node, msg);
- }
- });
- }
- else
- processSinglePartitionClientUpdate(node, msg);
+ final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+ null,
+ null);
+
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ // Finished future should reply only to sender client node.
+ exchFut.onReceive(node.id(), msg);
+ }
+ });
}
else
exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
@@ -892,23 +895,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node.
- * @param msg Message.
- */
- private void processSinglePartitionClientUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
- final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
- null,
- null);
-
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- // Finished future should reply only to sender client node.
- exchFut.onReceive(node.id(), msg);
- }
- });
- }
-
- /**
* @param node Node ID.
* @param msg Message.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index fded3c9..bd1dedf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable {
/** {@code True} if mapping is for near caches, {@code false} otherwise. */
private boolean near;
+ /** {@code True} if this is first mapping for optimistic tx on client node. */
+ private boolean clientFirst;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable {
}
/**
+ * @return {@code True} if this is first mapping for optimistic tx on client node.
+ */
+ public boolean clientFirst() {
+ return clientFirst;
+ }
+
+ /**
+ * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node.
+ */
+ public void clientFirst(boolean clientFirst) {
+ this.clientFirst = clientFirst;
+ }
+
+ /**
* @return {@code True} if mapping is for near caches, {@code false} otherwise.
*/
public boolean near() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c57eded..bdaa552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
/**
* Cache lock future.
*/
-public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
/** */
private static final long serialVersionUID = 0L;
@@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Near node ID. */
private UUID nearNodeId;
@@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param skipStore Skip store flag.
*/
public GridDhtLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
UUID nearNodeId,
GridCacheVersion nearLockVer,
@NotNull AffinityTopologyVersion topVer,
@@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param cacheCtx Cache context.
* @param invalidPart Partition to retry.
*/
- void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) {
+ void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) {
invalidParts.add(invalidPart);
// Register invalid partitions with transaction.
@@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param entries Entries to check.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach"})
- private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
+ private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
@Nullable List<GridDhtCacheEntry> entries) {
if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty())
return;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 26eef50..bda0868 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
return;
}
- // Group lock can be only started from local node, so we never start group lock transaction on remote node.
IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
// Register listener just so we print out errors.
@@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
assert nodeId != null;
assert res != null;
- GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
- res.futureId());
+ GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
assert tx != null;
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
+ GridDhtLockFuture fut = new GridDhtLockFuture(
ctx,
tx.nearNodeId(),
tx.nearXidVersion(),
@@ -719,10 +717,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (filter == null)
filter = req.filter();
- GridDhtLockFuture<K, V> fut = null;
+ GridDhtLockFuture fut = null;
if (!req.inTx()) {
- fut = new GridDhtLockFuture<>(ctx,
+ fut = new GridDhtLockFuture(ctx,
nearNode.id(),
req.version(),
req.topologyVersion(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 54b59b8..90edb0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** Near mappings. */
- protected Map<UUID, GridDistributedTxMapping> nearMap =
- new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
/** DHT mappings. */
- protected Map<UUID, GridDistributedTxMapping> dhtMap =
- new ConcurrentHashMap8<>();
+ protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
/** Mapped flag. */
- private AtomicBoolean mapped = new AtomicBoolean();
+ protected AtomicBoolean mapped = new AtomicBoolean();
/** */
private long dhtThreadId;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 293cf95..aed00ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.writeVersion(),
tx.invalidPartitions(),
ret,
- prepErr);
+ prepErr,
+ false);
if (prepErr == null) {
addDhtValues(res);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 05b3c7b..221b230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable TransactionIsolation isolation,
long accessTtl
) {
- assert tx == null || tx instanceof GridNearTxLocal;
+ assert tx == null || tx instanceof GridNearTxLocal : tx;
GridNearTxLocal txx = (GridNearTxLocal)tx;
CacheOperationContext opCtx = ctx.operationContextPerCall();
- GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+ GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx,
keys,
txx,
isRead,
@@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @return Lock future.
*/
IgniteInternalFuture<Exception> lockAllAsync(
- final GridCacheContext<K, V> cacheCtx,
+ final GridCacheContext<?, ?> cacheCtx,
@Nullable final GridNearTxLocal tx,
final long threadId,
final GridCacheVersion ver,
@@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @return Lock future.
*/
private IgniteInternalFuture<Exception> lockAllAsync0(
- GridCacheContext<K, V> cacheCtx,
+ GridCacheContext<?, ?> cacheCtx,
@Nullable final GridNearTxLocal tx,
long threadId,
final GridCacheVersion ver,
@@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
int cnt = keys.size();
if (tx == null) {
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+ GridDhtLockFuture fut = new GridDhtLockFuture(ctx,
ctx.localNodeId(),
ver,
topVer,
@@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
assert nodeId != null;
assert res != null;
- GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc().
+ GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
<Boolean>future(res.version(), res.futureId());
if (fut != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 372c517..9d19152 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*;
/**
* Colocated cache lock future.
*/
-public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
implements GridCacheFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
/** Cache registry. */
@GridToStringExclude
- private GridCacheContext<K, V> cctx;
+ private GridCacheContext<?, ?> cctx;
/** Lock owner thread. */
@GridToStringInclude
@@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* @param timeout Lock acquisition timeout.
* @param accessTtl TTL for read operation.
* @param filter Filter.
- * @param skipStore
+ * @param skipStore Skip store flag.
*/
public GridDhtColocatedLockFuture(
- GridCacheContext<K, V> cctx,
+ GridCacheContext<?, ?> cctx,
Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 94ca540..af7fa5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -229,15 +229,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
initFut = new GridFutureAdapter<>();
- // Grab all nodes with order of equal or less than last joined node.
- ClusterNode node = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
-
- oldestNode.set(node);
-
if (log.isDebugEnabled())
log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
}
+ /**
+ * @param reqs Cache change requests.
+ */
+ public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
+ this.reqs = reqs;
+ }
+
/** {@inheritDoc} */
@Override public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException {
get();
@@ -461,6 +463,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert discoEvt != null : this;
assert !dummy && !forcePreload : this;
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+ oldestNode.set(oldest);
+
startCaches();
// True if client node joined or failed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index e71dd65..35ef2e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -98,6 +98,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
* @param implicitTx Flag to indicate that transaction is implicit.
* @param implicitSingleTx Implicit-transaction-with-one-key flag.
* @param isRead Indicates whether implicit lock is for read or write operation.
+ * @param retVal Return value flag.
* @param isolation Transaction isolation.
* @param isInvalidate Invalidation flag.
* @param timeout Lock timeout.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 4f74303..713b713 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -221,18 +221,18 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
if (topVer != null) {
tx.topologyVersion(topVer);
- prepare0();
+ prepare0(false);
return;
}
- prepareOnTopology();
+ prepareOnTopology(false);
}
/**
- *
+ * @param remap Remap flag.
*/
- private void prepareOnTopology() {
+ private void prepareOnTopology(final boolean remap) {
GridDhtTopologyFuture topFut = topologyReadLock();
try {
@@ -265,16 +265,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
return;
}
- tx.topologyVersion(topFut.topologyVersion());
+ if (remap)
+ tx.onRemap(topFut.topologyVersion());
+ else
+ tx.topologyVersion(topFut.topologyVersion());
- prepare0();
+ prepare0(remap);
}
else {
topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
- prepareOnTopology();
+ prepareOnTopology(remap);
}
});
}
@@ -346,10 +349,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* Initializes future.
+ *
+ * @param remap Remap flag.
*/
- private void prepare0() {
+ private void prepare0(boolean remap) {
try {
- if (!tx.state(PREPARING)) {
+ boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
+
+ if (!txStateCheck) {
if (tx.setRollbackOnly()) {
if (tx.timedOut())
onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
@@ -366,7 +373,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
// Make sure to add future before calling prepare.
- cctx.mvcc().addFuture(this);
+ if (!remap)
+ cctx.mvcc().addFuture(this);
prepare(
tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
@@ -502,7 +510,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
tx.implicitSingle(),
m.explicitLock(),
tx.subjectId(),
- tx.taskNameHash());
+ tx.taskNameHash(),
+ m.clientFirst());
for (IgniteTxEntry txEntry : m.writes()) {
if (txEntry.op() == TRANSFORM)
@@ -560,13 +569,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
* @param entry Transaction entry.
* @param topVer Topology version.
* @param cur Current mapping.
+ * @param waitLock Wait lock flag.
* @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
* @return Mapping.
*/
private GridDistributedTxMapping map(
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
- GridDistributedTxMapping cur,
+ @Nullable GridDistributedTxMapping cur,
boolean waitLock
) throws IgniteCheckedException {
GridCacheContext cacheCtx = entry.context();
@@ -599,10 +609,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+ boolean clientFirst = cur == null && cctx.kernalContext().clientNode();
+
cur = new GridDistributedTxMapping(primary);
// Initialize near flag right away.
cur.near(cacheCtx.isNear());
+
+ cur.clientFirst(clientFirst);
}
cur.add(entry);
@@ -748,11 +762,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
onError(nodeId, mappings, res.error());
}
else {
- onPrepareResponse(m, res);
+ if (res.clientRemap()) {
+ assert cctx.kernalContext().clientNode();
+ assert m.clientFirst();
+
+ prepareOnTopology(true);
+ }
+ else {
+ onPrepareResponse(m, res);
- // Proceed prepare before finishing mini future.
- if (mappings != null)
- proceedPrepare(mappings);
+ // Proceed prepare before finishing mini future.
+ if (mappings != null)
+ proceedPrepare(mappings);
+ }
// Finish this mini future.
onDone(tx);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index bce62c1..1d06860 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -84,6 +84,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
+ assert !res.clientRemap() : res;
+
for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
MiniFuture f = (MiniFuture)fut;
@@ -187,7 +189,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
tx.implicitSingle(),
m.explicitLock(),
tx.subjectId(),
- tx.taskNameHash());
+ tx.taskNameHash(),
+ false);
for (IgniteTxEntry txEntry : m.writes()) {
if (txEntry.op() == TRANSFORM)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c38965d..e7664c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1185,6 +1185,22 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
+ @Override public void onRemap(AffinityTopologyVersion topVer) {
+ assert cctx.kernalContext().clientNode();
+
+ mapped.set(false);
+ nearLocallyMapped = false;
+ colocatedLocallyMapped = false;
+ txNodes = null;
+ onePhaseCommit = false;
+ nearMap.clear();
+ dhtMap.clear();
+ mappings.clear();
+
+ this.topVer.set(topVer);
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index a08637d..b602a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -75,6 +75,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Task name hash. */
private int taskNameHash;
+ /** {@code True} if first optimistic tx prepare request sent from client node. */
+ private boolean firstClientReq;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -92,8 +95,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param txNodes Transaction nodes mapping.
* @param last {@code True} if this last prepare request for node.
* @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
+ * @param onePhaseCommit One phase commit flag.
+ * @param retVal Return value flag.
+ * @param implicitSingle Implicit single flag.
+ * @param explicitLock Explicit lock flag.
* @param subjId Subject ID.
* @param taskNameHash Task name hash.
+ * @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node.
*/
public GridNearTxPrepareRequest(
IgniteUuid futId,
@@ -110,11 +118,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
boolean implicitSingle,
boolean explicitLock,
@Nullable UUID subjId,
- int taskNameHash
+ int taskNameHash,
+ boolean firstClientReq
) {
super(tx, reads, writes, txNodes, onePhaseCommit);
assert futId != null;
+ assert !firstClientReq || tx.optimistic() : tx;
this.futId = futId;
this.topVer = topVer;
@@ -126,6 +136,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
this.explicitLock = explicitLock;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+ this.firstClientReq = firstClientReq;
+ }
+
+ /**
+ * @return {@code True} if first optimistic tx prepare request sent from client node.
+ */
+ public boolean firstClientRequest() {
+ return firstClientReq;
}
/**
@@ -273,60 +291,66 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
writer.incrementState();
case 24:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeBoolean("firstClientReq", firstClientReq))
return false;
writer.incrementState();
case 25:
- if (!writer.writeBoolean("implicitSingle", implicitSingle))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 26:
- if (!writer.writeBoolean("last", last))
+ if (!writer.writeBoolean("implicitSingle", implicitSingle))
return false;
writer.incrementState();
case 27:
- if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
+ if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
case 28:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
return false;
writer.incrementState();
case 29:
- if (!writer.writeBoolean("near", near))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 30:
- if (!writer.writeBoolean("retVal", retVal))
+ if (!writer.writeBoolean("near", near))
return false;
writer.incrementState();
case 31:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("retVal", retVal))
return false;
writer.incrementState();
case 32:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 33:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 34:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -357,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 24:
- futId = reader.readIgniteUuid("futId");
+ firstClientReq = reader.readBoolean("firstClientReq");
if (!reader.isLastRead())
return false;
@@ -365,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 25:
- implicitSingle = reader.readBoolean("implicitSingle");
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -373,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 26:
- last = reader.readBoolean("last");
+ implicitSingle = reader.readBoolean("implicitSingle");
if (!reader.isLastRead())
return false;
@@ -381,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 27:
- lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
+ last = reader.readBoolean("last");
if (!reader.isLastRead())
return false;
@@ -389,7 +413,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 28:
- miniId = reader.readIgniteUuid("miniId");
+ lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
if (!reader.isLastRead())
return false;
@@ -397,7 +421,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 29:
- near = reader.readBoolean("near");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -405,7 +429,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 30:
- retVal = reader.readBoolean("retVal");
+ near = reader.readBoolean("near");
if (!reader.isLastRead())
return false;
@@ -413,7 +437,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 31:
- subjId = reader.readUuid("subjId");
+ retVal = reader.readBoolean("retVal");
if (!reader.isLastRead())
return false;
@@ -421,7 +445,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 32:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -429,6 +453,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 33:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 34:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -448,7 +480,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 34;
+ return 35;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index f8c07f7..e4dfd4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -83,6 +83,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
@GridDirectCollection(IgniteTxKey.class)
private Collection<IgniteTxKey> filterFailedKeys;
+ /** */
+ private boolean clientRemap;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -95,9 +98,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
* @param futId Future ID.
* @param miniId Mini future ID.
* @param dhtVer DHT version.
+ * @param writeVer Write version.
* @param invalidParts Invalid partitions.
* @param retVal Return value.
* @param err Error.
+ * @param clientRemap {@code True} if client node should remap transaction.
*/
public GridNearTxPrepareResponse(
GridCacheVersion xid,
@@ -107,7 +112,8 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
GridCacheVersion writeVer,
Collection<Integer> invalidParts,
GridCacheReturn retVal,
- Throwable err
+ Throwable err,
+ boolean clientRemap
) {
super(xid, err);
@@ -121,6 +127,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
this.writeVer = writeVer;
this.invalidParts = invalidParts;
this.retVal = retVal;
+ this.clientRemap = clientRemap;
+ }
+
+ /**
+ * @return {@code True} if client node should remap transaction.
+ */
+ public boolean clientRemap() {
+ return clientRemap;
}
/**
@@ -330,60 +344,66 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
switch (writer.state()) {
case 10:
- if (!writer.writeMessage("dhtVer", dhtVer))
+ if (!writer.writeBoolean("clientRemap", clientRemap))
return false;
writer.incrementState();
case 11:
- if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("dhtVer", dhtVer))
return false;
writer.incrementState();
case 12:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 13:
- if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 14:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 15:
- if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 16:
- if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 17:
- if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 18:
- if (!writer.writeMessage("retVal", retVal))
+ if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 19:
+ if (!writer.writeMessage("retVal", retVal))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -406,7 +426,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
switch (reader.state()) {
case 10:
- dhtVer = reader.readMessage("dhtVer");
+ clientRemap = reader.readBoolean("clientRemap");
if (!reader.isLastRead())
return false;
@@ -414,7 +434,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 11:
- filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
+ dhtVer = reader.readMessage("dhtVer");
if (!reader.isLastRead())
return false;
@@ -422,7 +442,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 12:
- futId = reader.readIgniteUuid("futId");
+ filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -430,7 +450,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 13:
- invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -438,7 +458,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 14:
- miniId = reader.readIgniteUuid("miniId");
+ invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@ -446,7 +466,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 15:
- ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -454,7 +474,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 16:
- ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
+ ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -462,7 +482,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 17:
- pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
+ ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -470,7 +490,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 18:
- retVal = reader.readMessage("retVal");
+ pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -478,6 +498,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 19:
+ retVal = reader.readMessage("retVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -497,7 +525,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 20;
+ return 21;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 5f877ec..cb86e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -706,4 +706,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
* @return Public API proxy.
*/
public TransactionProxy proxy();
+
+ /**
+ * @param topVer New topology version.
+ */
+ public void onRemap(AffinityTopologyVersion topVer);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb8825e..8cb9cc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -184,7 +184,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
/** Topology version. */
- private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
+ protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
/** Mutex. */
private final Lock lock = new ReentrantLock();
@@ -493,13 +493,17 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
+ @Override public void onRemap(AffinityTopologyVersion topVer) {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean hasTransforms() {
return transform;
}
/** {@inheritDoc} */
- @Override
- public boolean markPreparing() {
+ @Override public boolean markPreparing() {
return preparing.compareAndSet(false, true);
}
@@ -1716,6 +1720,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
+ @Override public void onRemap(AffinityTopologyVersion topVer) {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
@Override public boolean empty() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index f466bf2..f815a73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -229,14 +229,22 @@ public class IgniteTxHandler {
return null;
}
+ IgniteTxEntry firstEntry = null;
+
try {
- for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes()))
+ for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+
+ if (firstEntry == null)
+ firstEntry = e;
+ }
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
+ assert firstEntry != null : req;
+
GridDhtTxLocal tx;
GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
@@ -253,36 +261,87 @@ public class IgniteTxHandler {
}
}
else {
- tx = new GridDhtTxLocal(
- ctx,
- nearNode.id(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- req.implicitSingle(),
- req.implicitSingle(),
- req.system(),
- req.explicitLock(),
- req.policy(),
- req.concurrency(),
- req.isolation(),
- req.timeout(),
- req.isInvalidate(),
- false,
- req.txSize(),
- req.transactionNodes(),
- req.subjectId(),
- req.taskNameHash()
- );
+ GridDhtPartitionTopology top = null;
- tx = ctx.tm().onCreated(null, tx);
+ if (req.firstClientRequest()) {
+ assert req.concurrency().equals(OPTIMISTIC) : req;
- if (tx != null)
- tx.topologyVersion(req.topologyVersion());
- else
- U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
- req.version() + ", req=" + req + ']');
+ top = firstEntry.context().topology();
+
+ top.readLock();
+ }
+
+ try {
+ if (top != null && !top.topologyVersion().equals(req.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap transaction [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
+
+ GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.version(),
+ null,
+ null,
+ null,
+ true);
+
+ try {
+ ctx.io().send(nearNode, res, req.policy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send client tx remap response, client node failed " +
+ "[node=" + nearNode + ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send client tx remap response " +
+ "[node=" + nearNode + ", req=" + req + ']', e);
+ }
+
+ return new GridFinishedFuture<>(res);
+ }
+
+ tx = new GridDhtTxLocal(
+ ctx,
+ nearNode.id(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ req.implicitSingle(),
+ req.implicitSingle(),
+ req.system(),
+ req.explicitLock(),
+ req.policy(),
+ req.concurrency(),
+ req.isolation(),
+ req.timeout(),
+ req.isInvalidate(),
+ false,
+ req.txSize(),
+ req.transactionNodes(),
+ req.subjectId(),
+ req.taskNameHash()
+ );
+
+ tx = ctx.tm().onCreated(null, tx);
+
+ if (tx != null)
+ tx.topologyVersion(req.topologyVersion());
+ else
+ U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
+ req.version() + ", req=" + req + ']');
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
}
if (tx != null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d12dd417/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index f964d39..3b80c2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
import javax.cache.*;
import java.util.*;
@@ -228,7 +229,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
final Map<Integer, Integer> map = new HashMap<>();
- for (int i = 0; i < 1; i++)
+ for (int i = 0; i < 100; i++)
map.put(i, i);
TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
@@ -252,7 +253,76 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
IgniteEx ignite3 = startGrid(3);
- log.info("Stop block1.");
+ log.info("Stop block.");
+
+ spi.stopBlock();
+
+ putFut.get();
+
+ checkData(map, 4);
+
+ map.clear();
+
+ for (int i = 0; i < 100; i++)
+ map.put(i, i + 1);
+
+ cache.putAll(map);
+
+ checkData(map, 4);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testPessimisticTx() throws Exception {
+ ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(0);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(SYNC);
+
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ client = true;
+
+ final Ignite ignite2 = startGrid(2);
+
+ assertTrue(ignite2.configuration().isClientMode());
+
+ final Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < 1; i++)
+ map.put(i, i);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+ spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id());
+
+ final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
+
+ IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.putAll(map);
+
+ tx.commit();
+ }
+
+ return null;
+ }
+ });
+
+ assertFalse(putFut.isDone());
+
+ client = false;
+
+ IgniteEx ignite3 = startGrid(3);
+
+ log.info("Stop block.");
spi.stopBlock();
@@ -264,6 +334,99 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
/**
* @throws Exception If failed.
*/
+ public void testOptimisticTxMessageClientFirstFlag() throws Exception {
+ ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(SYNC);
+
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+ IgniteEx ignite2 = startGrid(2);
+
+ client = true;
+
+ Ignite ignite3 = startGrid(3);
+
+ assertTrue(ignite3.configuration().isClientMode());
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
+
+ IgniteCache<Integer, Integer> cache = ignite3.cache(null);
+
+ List<Integer> keys0 = primaryKeys(ignite0.cache(null), 2, 0);
+ List<Integer> keys1 = primaryKeys(ignite1.cache(null), 2, 0);
+ List<Integer> keys2 = primaryKeys(ignite2.cache(null), 2, 0);
+
+ LinkedHashMap<Integer, Integer> map = new LinkedHashMap<>();
+
+ map.put(keys0.get(0), 1);
+ map.put(keys1.get(0), 2);
+ map.put(keys2.get(0), 3);
+ map.put(keys0.get(1), 4);
+ map.put(keys1.get(1), 5);
+ map.put(keys2.get(1), 6);
+
+ spi.record(GridNearTxPrepareRequest.class);
+
+ try (Transaction tx = ignite3.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
+ for (Map.Entry<Integer, Integer> e : map.entrySet())
+ cache.put(e.getKey(), e.getValue());
+
+ tx.commit();
+ }
+
+ checkClientPrepareMessages(spi.recordedMessages(), 6);
+
+ checkData(map, 4);
+
+ cache.putAll(map);
+
+ checkClientPrepareMessages(spi.recordedMessages(), 6);
+
+ spi.record(null);
+
+ checkData(map, 4);
+
+ IgniteCache<Integer, Integer> cache0 = ignite0.cache(null);
+
+ TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+ spi0.record(GridNearTxPrepareRequest.class);
+
+ cache0.putAll(map);
+
+ spi0.record(null);
+
+ List<Object> msgs = spi0.recordedMessages();
+
+ assertEquals(4, msgs.size());
+
+ for (Object msg : msgs)
+ assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest());
+
+ checkData(map, 4);
+ }
+
+ /**
+ * @param msgs Messages.
+ * @param expCnt Expected number of messages.
+ */
+ private void checkClientPrepareMessages(List<Object> msgs, int expCnt) {
+ assertEquals(expCnt, msgs.size());
+
+ assertTrue(((GridNearTxPrepareRequest)msgs.get(0)).firstClientRequest());
+
+ for (int i = 1; i < msgs.size(); i++)
+ assertFalse(((GridNearTxPrepareRequest) msgs.get(i)).firstClientRequest());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testLockRemoveAfterClientFailed() throws Exception {
ccfg = new CacheConfiguration();
@@ -376,7 +539,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
/**
* @throws Exception If failed.
*/
- public void testPessimisticTxPutAllMultinode() throws Exception {
+ public void _testPessimisticTxPutAllMultinode() throws Exception {
putAllMultinode(null, true);
}
@@ -579,12 +742,21 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
/** */
private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
+ /** */
+ private Class<?> recordCls;
+
+ /** */
+ private List<Object> recordedMsgs = new ArrayList<>();
+
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
Object msg0 = ((GridIoMessage)msg).message();
synchronized (this) {
+ if (recordCls != null && msg0.getClass().equals(recordCls))
+ recordedMsgs.add(msg0);
+
Set<UUID> blockNodes = blockCls.get(msg0.getClass());
if (F.contains(blockNodes, node.id())) {
@@ -602,6 +774,28 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
}
/**
+ * @param recordCls Message class to record.
+ */
+ void record(@Nullable Class<?> recordCls) {
+ synchronized (this) {
+ this.recordCls = recordCls;
+ }
+ }
+
+ /**
+ * @return Recorded messages.
+ */
+ List<Object> recordedMessages() {
+ synchronized (this) {
+ List<Object> msgs = recordedMsgs;
+
+ recordedMsgs = new ArrayList<>();
+
+ return msgs;
+ }
+ }
+
+ /**
* @param cls Message class.
* @param nodeId Node ID.
*/