You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:42 UTC
[21/28] incubator-ignite git commit: ignite-545: merge from sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 4f74303..44b7997 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -221,18 +221,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
if (topVer != null) {
tx.topologyVersion(topVer);
- prepare0();
+ prepare0(false);
return;
}
- prepareOnTopology();
+ prepareOnTopology(false, null);
}
/**
- *
+ * @param remap Remap flag.
+ * @param c Optional closure to run after map.
*/
- private void prepareOnTopology() {
+ private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) {
GridDhtTopologyFuture topFut = topologyReadLock();
try {
@@ -265,16 +266,22 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
return;
}
- tx.topologyVersion(topFut.topologyVersion());
+ if (remap)
+ tx.onRemap(topFut.topologyVersion());
+ else
+ tx.topologyVersion(topFut.topologyVersion());
+
+ prepare0(remap);
- prepare0();
+ if (c != null)
+ c.run();
}
else {
topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
- prepareOnTopology();
+ prepareOnTopology(remap, c);
}
});
}
@@ -346,10 +353,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* Initializes future.
+ *
+ * @param remap Remap flag.
*/
- private void prepare0() {
+ private void prepare0(boolean remap) {
try {
- if (!tx.state(PREPARING)) {
+ boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
+
+ if (!txStateCheck) {
if (tx.setRollbackOnly()) {
if (tx.timedOut())
onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
@@ -366,7 +377,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
// Make sure to add future before calling prepare.
- cctx.mvcc().addFuture(this);
+ if (!remap)
+ cctx.mvcc().addFuture(this);
prepare(
tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
@@ -502,7 +514,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
tx.implicitSingle(),
m.explicitLock(),
tx.subjectId(),
- tx.taskNameHash());
+ tx.taskNameHash(),
+ m.clientFirst());
for (IgniteTxEntry txEntry : m.writes()) {
if (txEntry.op() == TRANSFORM)
@@ -560,13 +573,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
* @param entry Transaction entry.
* @param topVer Topology version.
* @param cur Current mapping.
+ * @param waitLock Wait lock flag.
* @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
* @return Mapping.
*/
private GridDistributedTxMapping map(
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
- GridDistributedTxMapping cur,
+ @Nullable GridDistributedTxMapping cur,
boolean waitLock
) throws IgniteCheckedException {
GridCacheContext cacheCtx = entry.context();
@@ -599,10 +613,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+ boolean clientFirst = cur == null && cctx.kernalContext().clientNode();
+
cur = new GridDistributedTxMapping(primary);
// Initialize near flag right away.
cur.near(cacheCtx.isNear());
+
+ cur.clientFirst(clientFirst);
}
cur.add(entry);
@@ -748,18 +766,47 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
onError(nodeId, mappings, res.error());
}
else {
- onPrepareResponse(m, res);
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
+ assert m.clientFirst();
+
+ IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ remap();
+ }
+ });
+ }
+ else
+ remap();
+ }
+ else {
+ onPrepareResponse(m, res);
- // Proceed prepare before finishing mini future.
- if (mappings != null)
- proceedPrepare(mappings);
+ // Proceed prepare before finishing mini future.
+ if (mappings != null)
+ proceedPrepare(mappings);
- // Finish this mini future.
- onDone(tx);
+ // Finish this mini future.
+ onDone(tx);
+ }
}
}
}
+ /**
+ *
+ */
+ private void remap() {
+ prepareOnTopology(true, new Runnable() {
+ @Override public void run() {
+ onDone(tx);
+ }
+ });
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index bce62c1..7006114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -84,6 +84,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
+ assert res.clientRemapVersion() == null : res;
+
for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
MiniFuture f = (MiniFuture)fut;
@@ -187,7 +189,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
tx.implicitSingle(),
m.explicitLock(),
tx.subjectId(),
- tx.taskNameHash());
+ tx.taskNameHash(),
+ false);
for (IgniteTxEntry txEntry : m.writes()) {
if (txEntry.op() == TRANSFORM)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index df7a65f..696acfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -403,7 +403,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
assert nodeId != null;
assert res != null;
- GridNearLockFuture<K, V> fut = (GridNearLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
+ GridNearLockFuture fut = (GridNearLockFuture)ctx.mvcc().<Boolean>future(res.version(),
res.futureId());
if (fut != null)
@@ -423,7 +423,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
- GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx,
+ GridNearLockFuture fut = new GridNearLockFuture(ctx,
keys,
(GridNearTxLocal)tx,
isRead,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c38965d..fa8877a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -56,8 +56,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** DHT mappings. */
- private ConcurrentMap<UUID, GridDistributedTxMapping> mappings =
- new ConcurrentHashMap8<>();
+ private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
/** Future. */
@GridToStringExclude
@@ -65,13 +64,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** */
@GridToStringExclude
- private final AtomicReference<GridNearTxFinishFuture> commitFut =
- new AtomicReference<>();
+ private final AtomicReference<GridNearTxFinishFuture> commitFut = new AtomicReference<>();
/** */
@GridToStringExclude
- private final AtomicReference<GridNearTxFinishFuture> rollbackFut =
- new AtomicReference<>();
+ private final AtomicReference<GridNearTxFinishFuture> rollbackFut = new AtomicReference<>();
/** Entries to lock on next step of prepare stage. */
private Collection<IgniteTxEntry> optimisticLockEntries = Collections.emptyList();
@@ -85,6 +82,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** Info for entries accessed locally in optimistic transaction. */
private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
+ /** */
+ private boolean hasRemoteLocks;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -97,6 +97,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @param implicit Implicit flag.
* @param implicitSingle Implicit with one key flag.
* @param sys System flag.
+ * @param plc IO policy.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -1185,6 +1186,36 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
+ @Override public void onRemap(AffinityTopologyVersion topVer) {
+ assert cctx.kernalContext().clientNode();
+
+ mapped.set(false);
+ nearLocallyMapped = false;
+ colocatedLocallyMapped = false;
+ txNodes = null;
+ onePhaseCommit = false;
+ nearMap.clear();
+ dhtMap.clear();
+ mappings.clear();
+
+ this.topVer.set(topVer);
+ }
+
+ /**
+ * @param hasRemoteLocks {@code True} if tx has remote locks acquired.
+ */
+ public void hasRemoteLocks(boolean hasRemoteLocks) {
+ this.hasRemoteLocks = hasRemoteLocks;
+ }
+
+ /**
+ * @return {@code True} if tx has remote locks acquired.
+ */
+ public boolean hasRemoteLocks() {
+ return hasRemoteLocks;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index a08637d..b602a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -75,6 +75,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Task name hash. */
private int taskNameHash;
+ /** {@code True} if first optimistic tx prepare request sent from client node. */
+ private boolean firstClientReq;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -92,8 +95,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param txNodes Transaction nodes mapping.
* @param last {@code True} if this last prepare request for node.
* @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
+ * @param onePhaseCommit One phase commit flag.
+ * @param retVal Return value flag.
+ * @param implicitSingle Implicit single flag.
+ * @param explicitLock Explicit lock flag.
* @param subjId Subject ID.
* @param taskNameHash Task name hash.
+ * @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node.
*/
public GridNearTxPrepareRequest(
IgniteUuid futId,
@@ -110,11 +118,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
boolean implicitSingle,
boolean explicitLock,
@Nullable UUID subjId,
- int taskNameHash
+ int taskNameHash,
+ boolean firstClientReq
) {
super(tx, reads, writes, txNodes, onePhaseCommit);
assert futId != null;
+ assert !firstClientReq || tx.optimistic() : tx;
this.futId = futId;
this.topVer = topVer;
@@ -126,6 +136,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
this.explicitLock = explicitLock;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+ this.firstClientReq = firstClientReq;
+ }
+
+ /**
+ * @return {@code True} if first optimistic tx prepare request sent from client node.
+ */
+ public boolean firstClientRequest() {
+ return firstClientReq;
}
/**
@@ -273,60 +291,66 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
writer.incrementState();
case 24:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeBoolean("firstClientReq", firstClientReq))
return false;
writer.incrementState();
case 25:
- if (!writer.writeBoolean("implicitSingle", implicitSingle))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 26:
- if (!writer.writeBoolean("last", last))
+ if (!writer.writeBoolean("implicitSingle", implicitSingle))
return false;
writer.incrementState();
case 27:
- if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
+ if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
case 28:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
return false;
writer.incrementState();
case 29:
- if (!writer.writeBoolean("near", near))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 30:
- if (!writer.writeBoolean("retVal", retVal))
+ if (!writer.writeBoolean("near", near))
return false;
writer.incrementState();
case 31:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("retVal", retVal))
return false;
writer.incrementState();
case 32:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 33:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 34:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -357,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 24:
- futId = reader.readIgniteUuid("futId");
+ firstClientReq = reader.readBoolean("firstClientReq");
if (!reader.isLastRead())
return false;
@@ -365,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 25:
- implicitSingle = reader.readBoolean("implicitSingle");
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -373,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 26:
- last = reader.readBoolean("last");
+ implicitSingle = reader.readBoolean("implicitSingle");
if (!reader.isLastRead())
return false;
@@ -381,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 27:
- lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
+ last = reader.readBoolean("last");
if (!reader.isLastRead())
return false;
@@ -389,7 +413,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 28:
- miniId = reader.readIgniteUuid("miniId");
+ lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
if (!reader.isLastRead())
return false;
@@ -397,7 +421,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 29:
- near = reader.readBoolean("near");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -405,7 +429,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 30:
- retVal = reader.readBoolean("retVal");
+ near = reader.readBoolean("near");
if (!reader.isLastRead())
return false;
@@ -413,7 +437,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 31:
- subjId = reader.readUuid("subjId");
+ retVal = reader.readBoolean("retVal");
if (!reader.isLastRead())
return false;
@@ -421,7 +445,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 32:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -429,6 +453,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 33:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 34:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -448,7 +480,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 34;
+ return 35;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index f8c07f7..0f0b2c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -27,6 +28,7 @@ import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
import java.io.*;
import java.nio.*;
@@ -83,6 +85,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
@GridDirectCollection(IgniteTxKey.class)
private Collection<IgniteTxKey> filterFailedKeys;
+ /** Not {@code null} if client node should remap transaction. */
+ private AffinityTopologyVersion clientRemapVer;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -95,9 +100,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
* @param futId Future ID.
* @param miniId Mini future ID.
* @param dhtVer DHT version.
+ * @param writeVer Write version.
* @param invalidParts Invalid partitions.
* @param retVal Return value.
* @param err Error.
+ * @param clientRemapVer Not {@code null} if client node should remap transaction.
*/
public GridNearTxPrepareResponse(
GridCacheVersion xid,
@@ -107,7 +114,8 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
GridCacheVersion writeVer,
Collection<Integer> invalidParts,
GridCacheReturn retVal,
- Throwable err
+ Throwable err,
+ AffinityTopologyVersion clientRemapVer
) {
super(xid, err);
@@ -121,6 +129,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
this.writeVer = writeVer;
this.invalidParts = invalidParts;
this.retVal = retVal;
+ this.clientRemapVer = clientRemapVer;
+ }
+
+ /**
+ * @return {@code True} if client node should remap transaction.
+ */
+ @Nullable public AffinityTopologyVersion clientRemapVersion() {
+ return clientRemapVer;
}
/**
@@ -330,60 +346,66 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
switch (writer.state()) {
case 10:
- if (!writer.writeMessage("dhtVer", dhtVer))
+ if (!writer.writeMessage("clientRemapVer", clientRemapVer))
return false;
writer.incrementState();
case 11:
- if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("dhtVer", dhtVer))
return false;
writer.incrementState();
case 12:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 13:
- if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 14:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 15:
- if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 16:
- if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 17:
- if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 18:
- if (!writer.writeMessage("retVal", retVal))
+ if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 19:
+ if (!writer.writeMessage("retVal", retVal))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -406,7 +428,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
switch (reader.state()) {
case 10:
- dhtVer = reader.readMessage("dhtVer");
+ clientRemapVer = reader.readMessage("clientRemapVer");
if (!reader.isLastRead())
return false;
@@ -414,7 +436,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 11:
- filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
+ dhtVer = reader.readMessage("dhtVer");
if (!reader.isLastRead())
return false;
@@ -422,7 +444,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 12:
- futId = reader.readIgniteUuid("futId");
+ filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -430,7 +452,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 13:
- invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -438,7 +460,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 14:
- miniId = reader.readIgniteUuid("miniId");
+ invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@ -446,7 +468,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 15:
- ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -454,7 +476,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 16:
- ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
+ ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -462,7 +484,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 17:
- pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
+ ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -470,7 +492,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 18:
- retVal = reader.readMessage("retVal");
+ pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -478,6 +500,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
case 19:
+ retVal = reader.readMessage("retVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -497,7 +527,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 20;
+ return 21;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 6120e25..4adcff5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -41,7 +41,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
private static final long serialVersionUID = 0L;
/** */
- private GridCachePreloader<K,V> preldr;
+ private GridCachePreloader preldr;
/**
* Empty constructor required by {@link Externalizable}.
@@ -56,7 +56,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
public GridLocalCache(GridCacheContext<K, V> ctx) {
super(ctx, ctx.config().getStartSize());
- preldr = new GridCachePreloaderAdapter<>(ctx);
+ preldr = new GridCachePreloaderAdapter(ctx);
}
/** {@inheritDoc} */
@@ -65,7 +65,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public GridCachePreloader<K, V> preloader() {
+ @Override public GridCachePreloader preloader() {
return preldr;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 819b0f0..bcbdec4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -53,7 +53,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();
/** */
- private GridCachePreloader<K,V> preldr;
+ private GridCachePreloader preldr;
/**
* Empty constructor required by {@link Externalizable}.
@@ -68,7 +68,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
public GridLocalAtomicCache(GridCacheContext<K, V> ctx) {
super(ctx, ctx.config().getStartSize());
- preldr = new GridCachePreloaderAdapter<>(ctx);
+ preldr = new GridCachePreloaderAdapter(ctx);
}
/** {@inheritDoc} */
@@ -94,7 +94,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public GridCachePreloader<K, V> preloader() {
+ @Override public GridCachePreloader preloader() {
return preldr;
}
@@ -119,7 +119,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Override public boolean put(K key, V val, CacheEntryPredicate[] filter) throws IgniteCheckedException {
A.notNull(key, "key", val, "val");
- return (Boolean)updateAllInternal(UPDATE,
+ boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
+ boolean res = (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
null,
@@ -129,6 +133,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
filter,
ctx.writeThrough(),
ctx.readThrough());
+
+ if (statsEnabled)
+ metrics0().addPutTimeNanos(System.nanoTime() - start);
+
+ return res;
}
/** {@inheritDoc} */
@@ -268,6 +277,10 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> m) throws IgniteCheckedException {
+ boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
updateAllInternal(UPDATE,
m.keySet(),
m.values(),
@@ -278,6 +291,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
CU.empty0(),
ctx.writeThrough(),
ctx.readThrough());
+
+ if (statsEnabled)
+ metrics0().addPutTimeNanos(System.nanoTime() - start);
}
/** {@inheritDoc} */
@@ -727,7 +743,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
final ExpiryPolicy expiry = expiryPerCall();
- return asyncOp(new Callable<Object>() {
+ IgniteInternalFuture fut = asyncOp(new Callable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(op,
keys,
@@ -741,6 +757,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
readThrough);
}
});
+
+ if (ctx.config().isStatisticsEnabled())
+ fut.listen(new UpdatePutTimeStatClosure(metrics0(), System.nanoTime()));
+
+ return fut;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 4b1fc87..7f0a5ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -348,17 +348,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param duration Duration.
*/
public void onExecuted(Object res, Throwable err, long startTime, long duration) {
- boolean fail = err != null;
-
- // Update own metrics.
- metrics.onQueryExecute(duration, fail);
-
- // Update metrics in query manager.
- cctx.queries().onMetricsUpdate(duration, fail);
-
- if (log.isDebugEnabled())
- log.debug("Query execution finished [qry=" + this + ", startTime=" + startTime +
- ", duration=" + duration + ", fail=" + fail + ", res=" + res + ']');
+ GridQueryProcessor.onExecuted(cctx, metrics, res, err, startTime, duration, log);
}
/** {@inheritDoc} */
@@ -376,10 +366,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
return execute(null, rmtTransform, args);
}
+ /** {@inheritDoc} */
@Override public QueryMetrics metrics() {
return metrics.copy();
}
+ /** {@inheritDoc} */
@Override public void resetMetrics() {
metrics = new GridCacheQueryMetricsAdapter();
}
@@ -470,10 +462,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) {
assert cctx != null;
- return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
+ Collection<ClusterNode> affNodes = CU.affinityNodes(cctx);
+
+ if (prj == null)
+ return affNodes;
+
+ return F.view(affNodes, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
- return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
- (prj == null || prj.node(n.id()) != null);
+ return prj.node(n.id()) != null;
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
index 2999e7b..15eb368 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
@@ -43,6 +43,8 @@ public class GridCacheQueryErrorFuture<T> extends GridFinishedFuture<Collection<
/** {@inheritDoc} */
@Nullable @Override public T next() throws IgniteCheckedException {
+ get();
+
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 16a8028..32e9d63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -773,7 +773,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
- private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+ private Iterator<K> iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
{
advance();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 759a949..6277c5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -250,8 +250,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
- public UUID executeQuery(CacheEntryUpdatedListener locLsnr, CacheEntryEventSerializableFilter rmtFilter,
- int bufSize, long timeInterval, boolean autoUnsubscribe, ClusterGroup grp) throws IgniteCheckedException {
+ public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
+ CacheEntryEventSerializableFilter rmtFilter,
+ int bufSize,
+ long timeInterval,
+ boolean autoUnsubscribe,
+ ClusterGroup grp) throws IgniteCheckedException
+ {
return executeQuery0(
locLsnr,
rmtFilter,
@@ -301,7 +306,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
try {
cctx.kernalContext().continuous().stopRoutine(routineId).get();
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException | IgniteException e) {
if (log.isDebugEnabled())
log.debug("Failed to stop internal continuous query: " + e.getMessage());
}
@@ -357,9 +362,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
- private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, final CacheEntryEventSerializableFilter rmtFilter,
- int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean notifyExisting,
- boolean oldValRequired, boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException {
+ private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
+ final CacheEntryEventSerializableFilter rmtFilter,
+ int bufSize,
+ long timeInterval,
+ boolean autoUnsubscribe,
+ boolean internal,
+ boolean notifyExisting,
+ boolean oldValRequired,
+ boolean sync,
+ boolean ignoreExpired,
+ ClusterGroup grp) throws IgniteCheckedException
+ {
cctx.checkSecurity(SecurityPermission.CACHE_READ);
if (grp == null)
@@ -745,7 +759,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- impl = (CacheEntryEventSerializableFilter)in.readObject();
+ impl = (CacheEntryEventFilter)in.readObject();
types = in.readByte();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
index 5fde622..02fe679 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.store;
-import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
index d9f50ac..a14df6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
@@ -68,6 +68,11 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> {
public boolean isWriteThrough();
/**
+ * @return {@code True} is write-behind is enabled.
+ */
+ public boolean isWriteBehind();
+
+ /**
* @return Whether DHT transaction can write to store from DHT.
*/
public boolean isWriteToStoreFromDht();
@@ -160,7 +165,7 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> {
* @param commit Commit.
* @throws IgniteCheckedException If failed.
*/
- public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException;
+ public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException;
/**
* End session initiated by write-behind store.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index f9a966c..b4a146a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -59,11 +59,20 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private ThreadLocal<SessionData> sesHolder;
/** */
+ private ThreadLocalSession locSes;
+
+ /** */
private boolean locStore;
/** */
private boolean writeThrough;
+ /** */
+ private Collection<CacheStoreSessionListener> sesLsnrs;
+
+ /** */
+ private boolean globalSesLsnrs;
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException {
@@ -84,14 +93,15 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore);
if (sesHolder0 == null) {
- ThreadLocalSession locSes = new ThreadLocalSession();
+ sesHolder0 = new ThreadLocal<>();
- if (ctx.resource().injectStoreSession(cfgStore, locSes)) {
- sesHolder0 = locSes.sesHolder;
+ locSes = new ThreadLocalSession(sesHolder0);
+ if (ctx.resource().injectStoreSession(cfgStore, locSes))
sesHolders.put(cfgStore, sesHolder0);
- }
}
+ else
+ locSes = new ThreadLocalSession(sesHolder0);
}
sesHolder = sesHolder0;
@@ -148,6 +158,24 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException("Failed to start cache store: " + e, e);
}
}
+
+ CacheConfiguration cfg = cctx.config();
+
+ if (cfgStore != null && !cfg.isWriteThrough() && !cfg.isReadThrough()) {
+ U.quietAndWarn(log,
+ "Persistence store is configured, but both read-through and write-through are disabled. This " +
+ "configuration makes sense if the store implements loadCache method only. If this is the " +
+ "case, ignore this warning. Otherwise, fix the configuration for cache: " + cfg.getName(),
+ "Persistence store is configured, but both read-through and write-through are disabled.");
+ }
+
+ sesLsnrs = CU.startStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
+
+ if (sesLsnrs == null) {
+ sesLsnrs = cctx.shared().storeSessionListeners();
+
+ globalSesLsnrs = true;
+ }
}
/** {@inheritDoc} */
@@ -164,6 +192,15 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
U.error(log(), "Failed to stop cache store.", e);
}
}
+
+ if (!globalSesLsnrs) {
+ try {
+ CU.stopStoreSessionListeners(cctx.kernalContext(), sesLsnrs);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to stop store session listeners for cache: " + cctx.name(), e);
+ }
+ }
}
/** {@inheritDoc} */
@@ -215,14 +252,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sessionInit0(tx);
- boolean thewEx = true;
+ boolean threwEx = true;
Object val = null;
try {
val = singleThreadGate.load(storeKey);
- thewEx = false;
+ threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
@@ -234,7 +271,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException(new CacheLoaderException(e));
}
finally {
- sessionEnd0(tx, thewEx);
+ sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
@@ -264,8 +301,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
+ @Override public boolean isWriteBehind() {
+ return cctx.config().isWriteBehindEnabled();
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isWriteToStoreFromDht() {
- return cctx.config().isWriteBehindEnabled() || locStore;
+ return isWriteBehind() || locStore;
}
/** {@inheritDoc} */
@@ -349,7 +391,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sessionInit0(tx);
- boolean thewEx = true;
+ boolean threwEx = true;
try {
IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() {
@@ -380,7 +422,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
else
singleThreadGate.loadAll(keys0, c);
- thewEx = false;
+ threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
@@ -392,7 +434,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException(new CacheLoaderException(e));
}
finally {
- sessionEnd0(tx, thewEx);
+ sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
@@ -408,7 +450,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sessionInit0(null);
- boolean thewEx = true;
+ boolean threwEx = true;
try {
store.loadCache(new IgniteBiInClosure<Object, Object>() {
@@ -431,7 +473,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
}, args);
- thewEx = false;
+ threwEx = false;
}
catch (CacheLoaderException e) {
throw new IgniteCheckedException(e);
@@ -440,7 +482,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException(new CacheLoaderException(e));
}
finally {
- sessionEnd0(null, thewEx);
+ sessionEnd0(null, threwEx);
}
if (log.isDebugEnabled())
@@ -473,12 +515,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sessionInit0(tx);
- boolean thewEx = true;
+ boolean threwEx = true;
try {
store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val));
- thewEx = false;
+ threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
@@ -490,7 +532,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException(new CacheWriterException(e));
}
finally {
- sessionEnd0(tx, thewEx);
+ sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
@@ -522,12 +564,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sessionInit0(tx);
- boolean thewEx = true;
+ boolean threwEx = true;
try {
store.writeAll(entries);
- thewEx = false;
+ threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
@@ -548,7 +590,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException(e);
}
finally {
- sessionEnd0(tx, thewEx);
+ sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
@@ -576,12 +618,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sessionInit0(tx);
- boolean thewEx = true;
+ boolean threwEx = true;
try {
store.delete(key);
- thewEx = false;
+ threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
@@ -593,7 +635,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException(new CacheWriterException(e));
}
finally {
- sessionEnd0(tx, thewEx);
+ sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
@@ -606,8 +648,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
- @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys)
- throws IgniteCheckedException {
+ @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) throws IgniteCheckedException {
if (F.isEmpty(keys))
return true;
@@ -625,12 +666,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sessionInit0(tx);
- boolean thewEx = true;
+ boolean threwEx = true;
try {
store.deleteAll(keys0);
- thewEx = false;
+ threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
@@ -645,7 +686,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new IgniteCheckedException(e);
}
finally {
- sessionEnd0(tx, thewEx);
+ sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
@@ -669,16 +710,27 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
- @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
+ @Override public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException {
assert store != null;
sessionInit0(tx);
try {
- store.sessionEnd(commit);
+ if (sesLsnrs != null) {
+ for (CacheStoreSessionListener lsnr : sesLsnrs)
+ lsnr.onSessionEnd(locSes, commit);
+ }
+
+ if (!sesHolder.get().ended(store))
+ store.sessionEnd(commit);
+ }
+ catch (Throwable e) {
+ last = true;
+
+ throw e;
}
finally {
- if (sesHolder != null) {
+ if (last && sesHolder != null) {
sesHolder.set(null);
tx.removeMeta(SES_ATTR);
@@ -715,10 +767,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
* @param tx Current transaction.
*/
private void sessionInit0(@Nullable IgniteInternalTx tx) {
- if (sesHolder == null)
- return;
-
- assert sesHolder.get() == null;
+ assert sesHolder != null;
SessionData ses;
@@ -738,6 +787,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
ses = new SessionData(null, cctx.name());
sesHolder.set(ses);
+
+ if (sesLsnrs != null && !ses.started(this)) {
+ for (CacheStoreSessionListener lsnr : sesLsnrs)
+ lsnr.onSessionStart(locSes);
+ }
}
/**
@@ -745,8 +799,16 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
*/
private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException {
try {
- if (tx == null)
- store.sessionEnd(threwEx);
+ if (tx == null) {
+ if (sesLsnrs != null) {
+ for (CacheStoreSessionListener lsnr : sesLsnrs)
+ lsnr.onSessionEnd(locSes, !threwEx);
+ }
+
+ assert !sesHolder.get().ended(store);
+
+ store.sessionEnd(!threwEx);
+ }
}
catch (Exception e) {
if (!threwEx)
@@ -788,6 +850,16 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
@GridToStringInclude
private Map<Object, Object> props;
+ /** */
+ private Object attachment;
+
+ /** */
+ private final Set<CacheStoreManager> started =
+ new GridSetWrapper<>(new IdentityHashMap<CacheStoreManager, Object>());
+
+ /** */
+ private final Set<CacheStore> ended = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
+
/**
* @param tx Current transaction.
* @param cacheName Cache name.
@@ -815,6 +887,24 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/**
+ * @param attachment Attachment.
+ */
+ private Object attach(Object attachment) {
+ Object prev = this.attachment;
+
+ this.attachment = attachment;
+
+ return prev;
+ }
+
+ /**
+ * @return Attachment.
+ */
+ private Object attachment() {
+ return attachment;
+ }
+
+ /**
* @return Cache name.
*/
private String cacheName() {
@@ -828,6 +918,21 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
this.cacheName = cacheName;
}
+ /**
+ * @return If session is started.
+ */
+ private boolean started(CacheStoreManager mgr) {
+ return !started.add(mgr);
+ }
+
+ /**
+ * @param store Cache store.
+ * @return Whether session already ended on this store instance.
+ */
+ private boolean ended(CacheStore store) {
+ return !ended.add(store);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SessionData.class, this, "tx", CU.txString(tx));
@@ -839,7 +944,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
*/
private static class ThreadLocalSession implements CacheStoreSession {
/** */
- private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>();
+ private final ThreadLocal<SessionData> sesHolder;
+
+ /**
+ * @param sesHolder Session holder.
+ */
+ private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) {
+ this.sesHolder = sesHolder;
+ }
/** {@inheritDoc} */
@Nullable @Override public Transaction transaction() {
@@ -854,6 +966,20 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
+ @Override public Object attach(@Nullable Object attachment) {
+ SessionData ses0 = sesHolder.get();
+
+ return ses0 != null ? ses0.attach(attachment) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> T attachment() {
+ SessionData ses0 = sesHolder.get();
+
+ return ses0 != null ? (T)ses0.attachment() : null;
+ }
+
+ /** {@inheritDoc} */
@Override public <K1, V1> Map<K1, V1> properties() {
SessionData ses0 = sesHolder.get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 5f877ec..cb86e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -706,4 +706,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
* @return Public API proxy.
*/
public TransactionProxy proxy();
+
+ /**
+ * @param topVer New topology version.
+ */
+ public void onRemap(AffinityTopologyVersion topVer);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb8825e..9e8950f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -184,7 +184,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
/** Topology version. */
- private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
+ protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
/** Mutex. */
private final Lock lock = new ReentrantLock();
@@ -405,7 +405,21 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public boolean storeUsed() {
- return storeEnabled() && store() != null;
+ if (!storeEnabled())
+ return false;
+
+ Collection<Integer> cacheIds = activeCacheIds();
+
+ if (!cacheIds.isEmpty()) {
+ for (int cacheId : cacheIds) {
+ CacheStoreManager store = cctx.cacheContext(cacheId).store();
+
+ if (store.configured())
+ return true;
+ }
+ }
+
+ return false;
}
/**
@@ -413,13 +427,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
*
* @return Store manager.
*/
- protected CacheStoreManager store() {
- if (!activeCacheIds().isEmpty()) {
- int cacheId = F.first(activeCacheIds());
+ protected Collection<CacheStoreManager> stores() {
+ Collection<Integer> cacheIds = activeCacheIds();
+
+ if (!cacheIds.isEmpty()) {
+ Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size());
- CacheStoreManager store = cctx.cacheContext(cacheId).store();
+ for (int cacheId : cacheIds) {
+ CacheStoreManager store = cctx.cacheContext(cacheId).store();
- return store.configured() ? store : null;
+ if (store.configured())
+ stores.add(store);
+ }
+
+ return stores;
}
return null;
@@ -493,13 +514,17 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
+ @Override public void onRemap(AffinityTopologyVersion topVer) {
+ assert false : this;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean hasTransforms() {
return transform;
}
/** {@inheritDoc} */
- @Override
- public boolean markPreparing() {
+ @Override public boolean markPreparing() {
return preparing.compareAndSet(false, true);
}
@@ -1716,6 +1741,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
+ @Override public void onRemap(AffinityTopologyVersion topVer) {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
@Override public boolean empty() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index f466bf2..e6d71aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -229,14 +230,22 @@ public class IgniteTxHandler {
return null;
}
+ IgniteTxEntry firstEntry = null;
+
try {
- for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes()))
+ for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+
+ if (firstEntry == null)
+ firstEntry = e;
+ }
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
+ assert firstEntry != null : req;
+
GridDhtTxLocal tx;
GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
@@ -253,36 +262,88 @@ public class IgniteTxHandler {
}
}
else {
- tx = new GridDhtTxLocal(
- ctx,
- nearNode.id(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- req.implicitSingle(),
- req.implicitSingle(),
- req.system(),
- req.explicitLock(),
- req.policy(),
- req.concurrency(),
- req.isolation(),
- req.timeout(),
- req.isInvalidate(),
- false,
- req.txSize(),
- req.transactionNodes(),
- req.subjectId(),
- req.taskNameHash()
- );
+ GridDhtPartitionTopology top = null;
- tx = ctx.tm().onCreated(null, tx);
+ if (req.firstClientRequest()) {
+ assert req.concurrency().equals(OPTIMISTIC) : req;
+ assert CU.clientNode(nearNode) : nearNode;
- if (tx != null)
- tx.topologyVersion(req.topologyVersion());
- else
- U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
- req.version() + ", req=" + req + ']');
+ top = firstEntry.context().topology();
+
+ top.readLock();
+ }
+
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap transaction [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
+
+ GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.version(),
+ null,
+ null,
+ null,
+ top.topologyVersion());
+
+ try {
+ ctx.io().send(nearNode, res, req.policy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send client tx remap response, client node failed " +
+ "[node=" + nearNode + ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send client tx remap response " +
+ "[node=" + nearNode + ", req=" + req + ']', e);
+ }
+
+ return new GridFinishedFuture<>(res);
+ }
+
+ tx = new GridDhtTxLocal(
+ ctx,
+ nearNode.id(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ req.implicitSingle(),
+ req.implicitSingle(),
+ req.system(),
+ req.explicitLock(),
+ req.policy(),
+ req.concurrency(),
+ req.isolation(),
+ req.timeout(),
+ req.isInvalidate(),
+ false,
+ req.txSize(),
+ req.transactionNodes(),
+ req.subjectId(),
+ req.taskNameHash()
+ );
+
+ tx = ctx.tm().onCreated(null, tx);
+
+ if (tx != null)
+ tx.topologyVersion(req.topologyVersion());
+ else
+ U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
+ req.version() + ", req=" + req + ']');
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
}
if (tx != null) {
@@ -343,6 +404,31 @@ public class IgniteTxHandler {
}
/**
+ * @param expVer Expected topology version.
+ * @param curVer Current topology version.
+ * @param req Request.
+ * @return {@code True} if cache affinity changed and request should be remapped.
+ */
+ private boolean needRemap(AffinityTopologyVersion expVer,
+ AffinityTopologyVersion curVer,
+ GridNearTxPrepareRequest req) {
+ if (expVer.equals(curVer))
+ return false;
+
+ for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
+ GridCacheContext ctx = e.context();
+
+ Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
+ Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+
+ if (!cacheNodes0.equals(cacheNodes1))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param nodeId Node ID.
* @param res Response.
*/
@@ -1024,8 +1110,10 @@ public class IgniteTxHandler {
return null;
}
}
- else
+ else {
+ tx.writeVersion(req.writeVersion());
tx.transactionNodes(req.transactionNodes());
+ }
if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {
int idx = 0;