You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/11 08:32:04 UTC
[3/3] ignite git commit: IGNITE-264 - Check backup node for one-phase
transaction when primary node crashes.
IGNITE-264 - Check backup node for one-phase transaction when primary node crashes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1707b68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1707b68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1707b68
Branch: refs/heads/ignite-1.4
Commit: e1707b6852f9d7c3e4999ea1d3967db68e7d8634
Parents: 071586e
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Sep 10 23:29:33 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Sep 10 23:29:33 2015 -0700
----------------------------------------------------------------------
.../internal/portable/PortableContext.java | 18 +-
.../processors/cache/GridCacheAdapter.java | 6 +
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 4 +-
.../processors/cache/GridCacheMvcc.java | 5 +-
.../distributed/GridCacheTxRecoveryFuture.java | 11 +-
.../distributed/GridDistributedCacheEntry.java | 6 +-
.../GridDistributedTxFinishRequest.java | 13 +-
.../GridDistributedTxRemoteAdapter.java | 10 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 514 +++++++++----------
.../distributed/dht/GridDhtTxFinishFuture.java | 15 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 84 ++-
.../dht/GridDhtTxFinishResponse.java | 89 +++-
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 67 ++-
.../distributed/dht/GridDhtTxPrepareFuture.java | 32 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 40 +-
.../dht/GridPartitionedGetFuture.java | 4 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +-
.../colocated/GridDhtColocatedLockFuture.java | 11 +-
.../distributed/near/GridNearLockFuture.java | 11 +-
.../distributed/near/GridNearLockRequest.java | 18 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 52 +-
.../GridNearPessimisticTxPrepareFuture.java | 11 +-
.../near/GridNearTxFinishFuture.java | 319 ++++++++++--
.../near/GridNearTxFinishRequest.java | 20 +-
.../cache/distributed/near/GridNearTxLocal.java | 64 +--
.../distributed/near/GridNearTxRemote.java | 38 +-
.../cache/transactions/IgniteTxAdapter.java | 5 +-
.../cache/transactions/IgniteTxHandler.java | 281 +++++-----
.../transactions/IgniteTxLocalAdapter.java | 37 +-
.../cache/transactions/IgniteTxManager.java | 48 +-
.../datastructures/DataStructuresProcessor.java | 102 ++--
.../GridTransactionalCacheQueueImpl.java | 15 +-
.../processors/igfs/IgfsFileAffinityRange.java | 4 +-
.../portable/GridPortableMetaDataSelfTest.java | 2 +
.../CacheStoreUsageMultinodeAbstractTest.java | 16 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 50 +-
.../processors/cache/GridCacheMvccSelfTest.java | 4 +-
.../cache/GridCachePutAllFailoverSelfTest.java | 28 +-
.../cache/IgniteCachePutAllRestartTest.java | 2 +
.../cache/IgniteInternalCacheTypesTest.java | 4 +-
.../cache/IgniteOnePhaseCommitNearSelfTest.java | 243 +++++++++
...ridCachePartitionNotLoadedEventSelfTest.java | 27 +-
.../GridCacheTransformEventSelfTest.java | 5 +-
.../dht/GridCacheTxNodeFailureSelfTest.java | 400 +++++++++++++++
.../dht/GridNearCacheTxNodeFailureSelfTest.java | 31 ++
...gniteAtomicLongChangingTopologySelfTest.java | 283 ++++++++++
.../near/IgniteCacheNearOnlyTxTest.java | 14 +-
.../IgniteCacheFailoverTestSuite.java | 9 +-
50 files changed, 2350 insertions(+), 734 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index c64adc8..165ad9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -440,7 +440,7 @@ public class PortableContext implements Externalizable {
PortableClassDescriptor desc = descByCls.get(cls);
if (desc == null || !desc.registered())
- desc = registerClassDescriptor(cls);
+ desc = registerClassDescriptor(cls, true);
return desc;
}
@@ -485,7 +485,7 @@ public class PortableContext implements Externalizable {
}
if (desc == null) {
- desc = registerClassDescriptor(cls);
+ desc = registerClassDescriptor(cls, false);
assert desc.typeId() == typeId;
}
@@ -499,7 +499,7 @@ public class PortableContext implements Externalizable {
* @param cls Class.
* @return Class descriptor.
*/
- private PortableClassDescriptor registerClassDescriptor(Class<?> cls) {
+ private PortableClassDescriptor registerClassDescriptor(Class<?> cls, boolean registerMetadata) {
PortableClassDescriptor desc;
String clsName = cls.getName();
@@ -525,7 +525,7 @@ public class PortableContext implements Externalizable {
desc = old;
}
else
- desc = registerUserClassDescriptor(cls);
+ desc = registerUserClassDescriptor(cls, registerMetadata);
return desc;
}
@@ -536,9 +536,7 @@ public class PortableContext implements Externalizable {
* @param cls Class.
* @return Class descriptor.
*/
- private PortableClassDescriptor registerUserClassDescriptor(Class<?> cls) {
- PortableClassDescriptor desc;
-
+ private PortableClassDescriptor registerUserClassDescriptor(Class<?> cls, boolean registerMetadata) {
boolean registered;
String typeName = typeName(cls.getName());
@@ -555,7 +553,7 @@ public class PortableContext implements Externalizable {
throw new PortableException("Failed to register class.", e);
}
- desc = new PortableClassDescriptor(this,
+ PortableClassDescriptor desc = new PortableClassDescriptor(this,
cls,
true,
typeId,
@@ -573,6 +571,10 @@ public class PortableContext implements Externalizable {
userTypes.put(typeId, desc);
descByCls.put(cls, desc);
+ // TODO uncomment for https://issues.apache.org/jira/browse/IGNITE-1377
+// if (registerMetadata && isMetaDataEnabled(typeId))
+// metaHnd.addMeta(typeId, new PortableMetaDataImpl(typeName, desc.fieldsMeta(), null));
+
return desc;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 79c5e4b..4460a2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4230,6 +4230,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
try {
return tFut.get();
}
+ catch (IgniteTxRollbackCheckedException e) {
+ throw e;
+ }
catch (IgniteCheckedException e1) {
tx0.rollbackAsync();
@@ -4253,6 +4256,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
try {
return tFut.get();
}
+ catch (IgniteTxRollbackCheckedException e) {
+ throw e;
+ }
catch (IgniteCheckedException e1) {
tx0.rollbackAsync();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3d0f1ae..86ba3e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -480,7 +480,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return {@code True} if should use system transactions which are isolated from user transactions.
*/
public boolean systemTx() {
- return cacheType == CacheType.UTILITY;
+ return cacheType == CacheType.UTILITY || (cacheType == CacheType.INTERNAL && transactional());
}
/**
@@ -1977,4 +1977,4 @@ public class GridCacheContext<K, V> implements Externalizable {
@Override public String toString() {
return "GridCacheContext: " + name();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a935b26..b55c84d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -643,7 +643,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id()))
throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
- if (cnt == retryCnt)
+ if (cnt == retryCnt || cctx.kernalContext().isStopping())
throw e;
else if (log.isDebugEnabled())
log.debug("Failed to send message to node (will retry): " + node.id());
@@ -1107,4 +1107,4 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
return res;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index d1393ce..c2102bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -527,7 +527,8 @@ public final class GridCacheMvcc {
/*reenter*/false,
tx,
implicitSingle,
- /*near-local*/false, dhtLoc
+ /*near-local*/false,
+ dhtLoc
);
cctx.mvcc().addLocal(cand);
@@ -1271,4 +1272,4 @@ public final class GridCacheMvcc {
@Override public String toString() { // Synchronize to ensure one-thread at a time.
return S.toString(GridCacheMvcc.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index eaaff67..d6f6a18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -167,7 +167,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
try {
cctx.io().send(nearNodeId, req, tx.ioPolicy());
}
- catch (ClusterTopologyCheckedException e) {
+ catch (ClusterTopologyCheckedException ignore) {
fut.onNodeLeft();
}
catch (IgniteCheckedException e) {
@@ -374,14 +374,11 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
- if (f.nodeId().equals(nodeId)) {
+ if (f.nodeId().equals(nodeId))
f.onNodeLeft();
-
- return true;
- }
}
- return false;
+ return true;
}
/** {@inheritDoc} */
@@ -509,4 +506,4 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index 3fada86..6904e56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -186,8 +186,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
long timeout,
boolean tx,
boolean implicitSingle,
- @Nullable GridCacheVersion owned) throws GridDistributedLockCancelledException,
- GridCacheEntryRemovedException {
+ @Nullable GridCacheVersion owned
+ ) throws GridDistributedLockCancelledException, GridCacheEntryRemovedException {
GridCacheMvccCandidate prev;
GridCacheMvccCandidate owner;
@@ -872,4 +872,4 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
@Override public synchronized String toString() {
return S.toString(GridDistributedCacheEntry.class, this, super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index fdd59be..ddf6799 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -210,17 +210,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
return commit ? syncCommit : syncRollback;
}
- /** {@inheritDoc}
- * @param ctx*/
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
- }
-
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -426,4 +415,4 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
return GridToStringBuilder.toString(GridDistributedTxFinishRequest.class, this,
"super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 41f9872..c930d88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -458,17 +458,17 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
assert txEntry != null : "Missing transaction entry for tx: " + this;
while (true) {
- GridCacheEntryEx Entry = txEntry.cached();
+ GridCacheEntryEx entry = txEntry.cached();
- assert Entry != null : "Missing cached entry for transaction entry: " + txEntry;
+ assert entry != null : "Missing cached entry for transaction entry: " + txEntry;
try {
GridCacheVersion ver = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer;
// If locks haven't been acquired yet, keep waiting.
- if (!Entry.lockedBy(ver)) {
+ if (!entry.lockedBy(ver)) {
if (log.isDebugEnabled())
- log.debug("Transaction does not own lock for entry (will wait) [entry=" + Entry +
+ log.debug("Transaction does not own lock for entry (will wait) [entry=" + entry +
", tx=" + this + ']');
return;
@@ -802,4 +802,4 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
@Override public String toString() {
return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index d81b72c..b9514a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -709,337 +709,311 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
@Nullable final CacheEntryPredicate[] filter0) {
final List<KeyCacheObject> keys = req.keys();
- IgniteInternalFuture<Object> keyFut = null;
+ CacheEntryPredicate[] filter = filter0;
- if (req.onePhaseCommit()) {
- boolean forceKeys = req.hasTransforms() || req.filter() != null;
+ // Set message into thread context.
+ GridDhtTxLocal tx = null;
- if (!forceKeys) {
- for (int i = 0; i < req.keysCount() && !forceKeys; i++)
- forceKeys |= req.returnValue(i);
- }
-
- if (forceKeys)
- keyFut = ctx.dht().dhtPreloader().request(keys, req.topologyVersion());
- }
+ try {
+ int cnt = keys.size();
- if (keyFut == null)
- keyFut = new GridFinishedFuture<>();
+ if (req.inTx()) {
+ GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
- return new GridEmbeddedFuture<>(keyFut,
- new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse>>() {
- @Override public IgniteInternalFuture<GridNearLockResponse> apply(Object o, Exception exx) {
- if (exx != null)
- return new GridDhtFinishedFuture<>(exx);
+ if (dhtVer != null)
+ tx = ctx.tm().tx(dhtVer);
+ }
- CacheEntryPredicate[] filter = filter0;
+ final List<GridCacheEntryEx> entries = new ArrayList<>(cnt);
- // Set message into thread context.
- GridDhtTxLocal tx = null;
+ // Unmarshal filter first.
+ if (filter == null)
+ filter = req.filter();
- try {
- int cnt = keys.size();
+ GridDhtLockFuture fut = null;
- if (req.inTx()) {
- GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
+ if (!req.inTx()) {
+ GridDhtPartitionTopology top = null;
- if (dhtVer != null)
- tx = ctx.tm().tx(dhtVer);
- }
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
- final List<GridCacheEntryEx> entries = new ArrayList<>(cnt);
+ top = topology();
- // Unmarshal filter first.
- if (filter == null)
- filter = req.filter();
+ topology().readLock();
+ }
- GridDhtLockFuture fut = null;
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
- if (!req.inTx()) {
- GridDhtPartitionTopology top = null;
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
- if (req.firstClientRequest()) {
- assert CU.clientNode(nearNode);
+ return new GridFinishedFuture<>(res);
+ }
- top = topology();
+ fut = new GridDhtLockFuture(ctx,
+ nearNode.id(),
+ req.version(),
+ req.topologyVersion(),
+ cnt,
+ req.txRead(),
+ req.needReturnValue(),
+ req.timeout(),
+ tx,
+ req.threadId(),
+ req.accessTtl(),
+ filter,
+ req.skipStore());
+
+ // Add before mapping.
+ if (!ctx.mvcc().addFuture(fut))
+ throw new IllegalStateException("Duplicate future ID: " + fut);
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
+ }
- topology().readLock();
- }
+ boolean timedout = false;
- try {
- if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
- if (log.isDebugEnabled()) {
- log.debug("Client topology version mismatch, need remap lock request [" +
- "reqTopVer=" + req.topologyVersion() +
- ", locTopVer=" + top.topologyVersion() +
- ", req=" + req + ']');
- }
+ for (KeyCacheObject key : keys) {
+ if (timedout)
+ break;
- GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
- req,
- top.topologyVersion());
+ while (true) {
+ // Specify topology version to make sure containment is checked
+ // based on the requested version, not the latest.
+ GridDhtCacheEntry entry = entryExx(key, req.topologyVersion());
- return new GridFinishedFuture<>(res);
- }
+ try {
+ if (fut != null) {
+ // This method will add local candidate.
+ // Entry cannot become obsolete after this method succeeded.
+ fut.addEntry(key == null ? null : entry);
- fut = new GridDhtLockFuture(ctx,
- nearNode.id(),
- req.version(),
- req.topologyVersion(),
- cnt,
- req.txRead(),
- req.needReturnValue(),
- req.timeout(),
- tx,
- req.threadId(),
- req.accessTtl(),
- filter,
- req.skipStore());
+ if (fut.isDone()) {
+ timedout = true;
- // Add before mapping.
- if (!ctx.mvcc().addFuture(fut))
- throw new IllegalStateException("Duplicate future ID: " + fut);
- }
- finally {
- if (top != null)
- top.readUnlock();
+ break;
}
}
- boolean timedout = false;
-
- for (KeyCacheObject key : keys) {
- if (timedout)
- break;
+ entries.add(entry);
- while (true) {
- // Specify topology version to make sure containment is checked
- // based on the requested version, not the latest.
- GridDhtCacheEntry entry = entryExx(key, req.topologyVersion());
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry when adding lock (will retry): " + entry);
+ }
+ catch (GridDistributedLockCancelledException e) {
+ if (log.isDebugEnabled())
+ log.debug("Got lock request for cancelled lock (will ignore): " +
+ entry);
- try {
- if (fut != null) {
- // This method will add local candidate.
- // Entry cannot become obsolete after this method succeeded.
- fut.addEntry(key == null ? null : entry);
+ fut.onError(e);
- if (fut.isDone()) {
- timedout = true;
+ return new GridDhtFinishedFuture<>(e);
+ }
+ }
+ }
- break;
- }
- }
+ // Handle implicit locks for pessimistic transactions.
+ if (req.inTx()) {
+ if (tx == null) {
+ GridDhtPartitionTopology top = null;
- entries.add(entry);
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry when adding lock (will retry): " + entry);
- }
- catch (GridDistributedLockCancelledException e) {
- if (log.isDebugEnabled())
- log.debug("Got lock request for cancelled lock (will ignore): " +
- entry);
+ top = topology();
- fut.onError(e);
+ topology().readLock();
+ }
- return new GridDhtFinishedFuture<>(e);
- }
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
}
- }
- // Handle implicit locks for pessimistic transactions.
- if (req.inTx()) {
- if (tx == null) {
- GridDhtPartitionTopology top = null;
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
- if (req.firstClientRequest()) {
- assert CU.clientNode(nearNode);
+ return new GridFinishedFuture<>(res);
+ }
- top = topology();
+ tx = new GridDhtTxLocal(
+ ctx.shared(),
+ nearNode.id(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ req.implicitTx(),
+ req.implicitSingleTx(),
+ ctx.systemTx(),
+ false,
+ ctx.ioPolicy(),
+ PESSIMISTIC,
+ req.isolation(),
+ req.timeout(),
+ req.isInvalidate(),
+ !req.skipStore(),
+ false,
+ req.txSize(),
+ null,
+ req.subjectId(),
+ req.taskNameHash());
- topology().readLock();
- }
+ tx.syncCommit(req.syncCommit());
- try {
- if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
- if (log.isDebugEnabled()) {
- log.debug("Client topology version mismatch, need remap lock request [" +
- "reqTopVer=" + req.topologyVersion() +
- ", locTopVer=" + top.topologyVersion() +
- ", req=" + req + ']');
- }
+ tx = ctx.tm().onCreated(null, tx);
- GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
- req,
- top.topologyVersion());
+ if (tx == null || !tx.init()) {
+ String msg = "Failed to acquire lock (transaction has been completed): " +
+ req.version();
- return new GridFinishedFuture<>(res);
- }
+ U.warn(log, msg);
- tx = new GridDhtTxLocal(
- ctx.shared(),
- nearNode.id(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- req.implicitTx(),
- req.implicitSingleTx(),
- ctx.systemTx(),
- false,
- ctx.ioPolicy(),
- PESSIMISTIC,
- req.isolation(),
- req.timeout(),
- req.isInvalidate(),
- true,
- req.txSize(),
- null,
- req.subjectId(),
- req.taskNameHash());
-
- tx.syncCommit(req.syncCommit());
-
- tx = ctx.tm().onCreated(null, tx);
-
- if (tx == null || !tx.init()) {
- String msg = "Failed to acquire lock (transaction has been completed): " +
- req.version();
-
- U.warn(log, msg);
-
- if (tx != null)
- tx.rollback();
-
- return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
- }
+ if (tx != null)
+ tx.rollback();
- tx.topologyVersion(req.topologyVersion());
- }
- finally {
- if (top != null)
- top.readUnlock();
- }
- }
+ return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+ }
- ctx.tm().txContext(tx);
+ tx.topologyVersion(req.topologyVersion());
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
+ }
- if (log.isDebugEnabled())
- log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']');
+ ctx.tm().txContext(tx);
- IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(
- cacheCtx,
+ if (log.isDebugEnabled())
+ log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']');
+
+ IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(
+ cacheCtx,
+ entries,
+ req.messageId(),
+ req.txRead(),
+ req.needReturnValue(),
+ req.accessTtl(),
+ req.skipStore());
+
+ final GridDhtTxLocal t = tx;
+
+ return new GridDhtEmbeddedFuture(
+ txFut,
+ new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() {
+ @Override public IgniteInternalFuture<GridNearLockResponse> apply(
+ GridCacheReturn o, Exception e) {
+ if (e != null)
+ e = U.unwrap(e);
+
+ assert !t.empty();
+
+ // Create response while holding locks.
+ final GridNearLockResponse resp = createLockReply(nearNode,
entries,
- req.onePhaseCommit(),
- req.messageId(),
- req.txRead(),
- req.needReturnValue(),
- req.accessTtl(),
- req.skipStore());
+ req,
+ t,
+ t.xidVersion(),
+ e);
+
+ if (resp.error() == null && t.onePhaseCommit()) {
+ assert t.implicit();
+
+ return t.commitAsync().chain(
+ new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() {
+ @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ try {
+ // Check for error.
+ f.get();
+ }
+ catch (IgniteCheckedException e1) {
+ resp.error(e1);
+ }
- final GridDhtTxLocal t = tx;
-
- return new GridDhtEmbeddedFuture(
- txFut,
- new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() {
- @Override public IgniteInternalFuture<GridNearLockResponse> apply(
- GridCacheReturn o, Exception e) {
- if (e != null)
- e = U.unwrap(e);
-
- assert !t.empty();
-
- // Create response while holding locks.
- final GridNearLockResponse resp = createLockReply(nearNode,
- entries,
- req,
- t,
- t.xidVersion(),
- e);
-
- if (resp.error() == null && t.onePhaseCommit()) {
- assert t.implicit();
-
- return t.commitAsync().chain(
- new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() {
- @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) {
- try {
- // Check for error.
- f.get();
- }
- catch (IgniteCheckedException e1) {
- resp.error(e1);
- }
-
- sendLockReply(nearNode, t, req, resp);
-
- return resp;
- }
- });
- }
- else {
sendLockReply(nearNode, t, req, resp);
- return new GridFinishedFuture<>(resp);
+ return resp;
}
- }
- }
- );
+ });
+ }
+ else {
+ sendLockReply(nearNode, t, req, resp);
+
+ return new GridFinishedFuture<>(resp);
+ }
}
- else {
- assert fut != null;
+ }
+ );
+ }
+ else {
+ assert fut != null;
- // This will send remote messages.
- fut.map();
+ // This will send remote messages.
+ fut.map();
- final GridCacheVersion mappedVer = fut.version();
+ final GridCacheVersion mappedVer = fut.version();
- return new GridDhtEmbeddedFuture<>(
- new C2<Boolean, Exception, GridNearLockResponse>() {
- @Override public GridNearLockResponse apply(Boolean b, Exception e) {
- if (e != null)
- e = U.unwrap(e);
- else if (!b)
- e = new GridCacheLockTimeoutException(req.version());
+ return new GridDhtEmbeddedFuture<>(
+ new C2<Boolean, Exception, GridNearLockResponse>() {
+ @Override public GridNearLockResponse apply(Boolean b, Exception e) {
+ if (e != null)
+ e = U.unwrap(e);
+ else if (!b)
+ e = new GridCacheLockTimeoutException(req.version());
- GridNearLockResponse res = createLockReply(nearNode,
- entries,
- req,
- null,
- mappedVer,
- e);
+ GridNearLockResponse res = createLockReply(nearNode,
+ entries,
+ req,
+ null,
+ mappedVer,
+ e);
- sendLockReply(nearNode, null, req, res);
+ sendLockReply(nearNode, null, req, res);
- return res;
- }
- },
- fut);
+ return res;
}
- }
- catch (IgniteCheckedException e) {
- String err = "Failed to unmarshal at least one of the keys for lock request message: " + req;
-
- U.error(log, err, e);
+ },
+ fut);
+ }
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ String err = "Failed to unmarshal at least one of the keys for lock request message: " + req;
- if (tx != null) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to rollback the transaction: " + tx, ex);
- }
- }
+ U.error(log, err, e);
- return new GridDhtFinishedFuture<>(
- new IgniteCheckedException(err, e));
- }
+ if (tx != null) {
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to rollback the transaction: " + tx, ex);
}
}
- );
+
+ return new GridDhtFinishedFuture<>(
+ new IgniteCheckedException(err, e));
+ }
}
/**
@@ -1626,4 +1600,4 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (nearEntry != null)
nearEntry.markObsolete(ctx.versions().next());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index a7ec20f..79bccc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -219,7 +219,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
@Override public boolean onDone(IgniteInternalTx tx, Throwable err) {
if (initialized() || err != null) {
if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING))
- this.tx.tmCommit();
+ this.tx.tmFinish(err == null);
Throwable e = this.err.get();
@@ -255,7 +255,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/**
* Initializes future.
*/
- @SuppressWarnings("SimplifiableIfStatement")
+ @SuppressWarnings({"SimplifiableIfStatement", "IfMayBeConditional"})
public void finish() {
boolean sync;
@@ -277,7 +277,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
* @param nodes Nodes.
* @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for.
*/
- private boolean rollbackLockTransactions(Set<ClusterNode> nodes) {
+ private boolean rollbackLockTransactions(Collection<ClusterNode> nodes) {
assert !commit;
assert !F.isEmpty(nodes);
@@ -399,6 +399,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.subjectId(),
tx.taskNameHash());
+ req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
+
try {
cctx.io().send(n, req, tx.ioPolicy());
@@ -450,8 +452,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.subjectId(),
tx.taskNameHash());
- if (tx.onePhaseCommit())
- req.writeVersion(tx.writeVersion());
+ req.writeVersion(tx.writeVersion());
try {
cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
@@ -516,7 +517,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/**
* @param node Node.
*/
- public MiniFuture(ClusterNode node) {
+ private MiniFuture(ClusterNode node) {
this.node = node;
}
@@ -582,4 +583,4 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index a9cb299..f859314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -63,6 +63,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> pendingVers;
+ /** Check comitted flag. */
+ private boolean checkCommitted;
+
/** One phase commit write version. */
private GridCacheVersion writeVer;
@@ -126,8 +129,21 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
@Nullable UUID subjId,
int taskNameHash
) {
- super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer,
- committedVers, rolledbackVers, txSize);
+ super(
+ xidVer,
+ futId,
+ commitVer,
+ threadId,
+ commit,
+ invalidate,
+ sys,
+ plc,
+ syncCommit,
+ syncRollback,
+ baseVer,
+ committedVers,
+ rolledbackVers,
+ txSize);
assert miniId != null;
assert nearNodeId != null;
@@ -221,6 +237,20 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
}
+ /**
+ * @return Check committed flag.
+ */
+ public boolean checkCommitted() {
+ return checkCommitted;
+ }
+
+ /**
+ * @param checkCommitted Check committed flag.
+ */
+ public void checkCommitted(boolean checkCommitted) {
+ this.checkCommitted = checkCommitted;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
@@ -242,54 +272,60 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
switch (writer.state()) {
case 18:
- if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
+ if (!writer.writeBoolean("checkCommitted", checkCommitted))
return false;
writer.incrementState();
case 19:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 20:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 21:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 22:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 23:
- if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 24:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
return false;
writer.incrementState();
case 25:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 26:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 27:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -312,6 +348,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
switch (reader.state()) {
case 18:
+ checkCommitted = reader.readBoolean("checkCommitted");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -323,7 +367,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 19:
+ case 20:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -331,7 +375,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 20:
+ case 21:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -339,7 +383,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 21:
+ case 22:
pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -347,7 +391,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 22:
+ case 23:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -355,7 +399,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 23:
+ case 24:
sysInvalidate = reader.readBoolean("sysInvalidate");
if (!reader.isLastRead())
@@ -363,7 +407,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 24:
+ case 25:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -371,7 +415,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 25:
+ case 26:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -379,7 +423,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 26:
+ case 27:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -399,6 +443,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 27;
+ return 28;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index d696c05..ec0f234 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -19,6 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -36,6 +40,16 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** Mini future ID. */
private IgniteUuid miniId;
+ /** Error. */
+ @GridDirectTransient
+ private Throwable checkCommittedErr;
+
+ /** Serialized error. */
+ private byte[] checkCommittedErrBytes;
+
+ /** Flag indicating if this is a check-committed response. */
+ private boolean checkCommitted;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -63,6 +77,51 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
return miniId;
}
+ /**
+ * @return Error for check committed backup requests.
+ */
+ public Throwable checkCommittedError() {
+ return checkCommittedErr;
+ }
+
+ /**
+ * @param checkCommittedErr Error for check committed backup requests.
+ */
+ public void checkCommittedError(Throwable checkCommittedErr) {
+ this.checkCommittedErr = checkCommittedErr;
+ }
+
+ /**
+ * @return Check committed flag.
+ */
+ public boolean checkCommitted() {
+ return checkCommitted;
+ }
+
+ /**
+ * @param checkCommitted Check committed flag.
+ */
+ public void checkCommitted(boolean checkCommitted) {
+ this.checkCommitted = checkCommitted;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (checkCommittedErr != null)
+ checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr)
+ throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (checkCommittedErrBytes != null)
+ checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, ldr);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxFinishResponse.class, this, super.toString());
@@ -84,6 +143,18 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
switch (writer.state()) {
case 5:
+ if (!writer.writeBoolean("checkCommitted", checkCommitted))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
@@ -106,6 +177,22 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
switch (reader.state()) {
case 5:
+ checkCommitted = reader.readBoolean("checkCommitted");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -125,6 +212,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 8;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index b23b3e1..4f8469f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -128,6 +128,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
long timeout,
boolean invalidate,
boolean storeEnabled,
+ boolean onePhaseCommit,
int txSize,
Map<UUID, Collection<UUID>> txNodes,
UUID subjId,
@@ -146,6 +147,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
timeout,
invalidate,
storeEnabled,
+ onePhaseCommit,
txSize,
subjId,
taskNameHash);
@@ -700,4 +702,4 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
@Override public String toString() {
return GridToStringBuilder.toString(GridDhtTxLocal.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 762d26f..8c7d985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -98,6 +98,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/** Versions of pending locks for entries of this tx. */
private Collection<GridCacheVersion> pendingVers;
+ /** Flag indicating that originating node has near cache. */
+ private boolean nearOnOriginatingNode;
+
/** Nodes where transactions were started on lock step. */
private Set<ClusterNode> lockTxNodes;
@@ -132,12 +135,28 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
long timeout,
boolean invalidate,
boolean storeEnabled,
+ boolean onePhaseCommit,
int txSize,
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, invalidate,
- storeEnabled, txSize, subjId, taskNameHash);
+ super(
+ cctx,
+ xidVer,
+ implicit,
+ implicitSingle,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ timeout,
+ invalidate,
+ storeEnabled,
+ onePhaseCommit,
+ txSize,
+ subjId,
+ taskNameHash
+ );
assert cctx != null;
@@ -161,6 +180,29 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
}
/**
+ * Sets flag that indicates that originating node has a near cache that participates in this transaction.
+ *
+ * @param hasNear Has near cache flag.
+ */
+ public void nearOnOriginatingNode(boolean hasNear) {
+ nearOnOriginatingNode = hasNear;
+ }
+
+ /**
+ * @return {@code True} if explicit lock transaction.
+ */
+ public boolean explicitLock() {
+ return explicitLock;
+ }
+
+ /**
+ * @param explicitLock Explicit lock flag.
+ */
+ public void explicitLock(boolean explicitLock) {
+ this.explicitLock = explicitLock;
+ }
+
+ /**
* @return Nodes where transactions were started on lock step.
*/
@Nullable public Set<ClusterNode> lockTransactionNodes() {
@@ -229,20 +271,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
}
/**
- * @return Explicit lock flag.
- */
- public boolean explicitLock() {
- return explicitLock;
- }
-
- /**
- * @param explicitLock Explicit lock flag.
- */
- public void explicitLock(boolean explicitLock) {
- this.explicitLock = explicitLock;
- }
-
- /**
* @return DHT thread ID.
*/
long dhtThreadId() {
@@ -570,7 +598,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/**
* @param cacheCtx Cache context.
* @param entries Entries to lock.
- * @param onePhaseCommit One phase commit flag.
* @param msgId Message ID.
* @param read Read flag.
* @param accessTtl TTL for read operation.
@@ -582,7 +609,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
IgniteInternalFuture<GridCacheReturn> lockAllAsync(
GridCacheContext cacheCtx,
List<GridCacheEntryEx> entries,
- boolean onePhaseCommit,
long msgId,
final boolean read,
final boolean needRetVal,
@@ -864,13 +890,14 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
* @return {@code True} if transaction is finished on prepare step.
*/
protected final boolean commitOnPrepare() {
- return onePhaseCommit() && !near();
+ return onePhaseCommit() && !near() && !nearOnOriginatingNode;
}
/**
* @param prepFut Prepare future.
* @return If transaction if finished on prepare step returns future which is completed after transaction finish.
*/
+ @SuppressWarnings("TypeMayBeWeakened")
protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare(
final GridDhtTxPrepareFuture prepFut) {
if (commitOnPrepare()) {
@@ -901,4 +928,4 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
"dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 6e8460f..89fc0ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -338,15 +339,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
- CacheObject val;
-
cached.unswap(retVal);
boolean readThrough = (retVal || hasFilters) &&
cacheCtx.config().isLoadPreviousValue() &&
!txEntry.skipStore();
- val = cached.innerGet(
+ CacheObject val = cached.innerGet(
tx,
/*swap*/true,
readThrough,
@@ -561,7 +560,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.optimistic())
tx.clearPrepareFuture(this);
- if (tx.onePhaseCommit()) {
+ // Do not commit one-phase commit transaction if originating node has near cache enabled.
+ if (tx.onePhaseCommit() && tx.commitOnPrepare()) {
assert last;
// Must create prepare response before transaction is committed to grab correct return value.
@@ -639,8 +639,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @throws IgniteCheckedException If failed to send response.
*/
private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
- if (!tx.nearNodeId().equals(cctx.localNodeId()))
+ if (!tx.nearNodeId().equals(cctx.localNodeId())) {
+ Throwable err = this.err.get();
+
+ if (err != null && err instanceof IgniteFutureCancelledException)
+ return;
+
cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
+ }
}
/**
@@ -902,6 +908,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
// We are holding transaction-level locks for entries here, so we can get next write version.
onEntriesLocked();
+ // We are holding transaction-level locks for entries here, so we can get next write version.
tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
{
@@ -978,9 +985,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (entry.explicitVersion() == null) {
GridCacheMvccCandidate added = cached.candidate(version());
- assert added == null || added.dhtLocal() :
- "Got non-dht-local candidate for prepare future " +
- "[added=" + added + ", entry=" + entry + ']';
+ assert added != null : "Null candidate for non-group-lock entry " +
+ "[added=" + added + ", entry=" + entry + ']';
+ assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+ "[added=" + added + ", entry=" + entry + ']';
if (added != null && added.ownerVersion() != null)
req.owned(entry.txKey(), added.ownerVersion());
@@ -1070,8 +1078,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (entry.explicitVersion() == null) {
GridCacheMvccCandidate added = entry.cached().candidate(version());
- assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
- "[added=" + added + ", entry=" + entry + ']';
+ assert added != null : "Null candidate for non-group-lock entry " +
+ "[added=" + added + ", entry=" + entry + ']';
+ assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+ "[added=" + added + ", entry=" + entry + ']';
if (added != null && added.ownerVersion() != null)
req.owned(entry.txKey(), added.ownerVersion());
@@ -1473,4 +1483,4 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 2ff34a9..f8be2a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -111,8 +111,22 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, subjId, taskNameHash);
+ super(
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
+ txSize,
+ subjId,
+ taskNameHash
+ );
assert nearNodeId != null;
assert rmtFutId != null;
@@ -168,8 +182,22 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, subjId, taskNameHash);
+ super(
+ ctx,
+ nodeId,
+ rmtThreadId,
+ xidVer,
+ commitVer,
+ sys,
+ plc,
+ concurrency,
+ isolation,
+ invalidate,
+ timeout,
+ txSize,
+ subjId,
+ taskNameHash
+ );
assert nearNodeId != null;
assert rmtFutId != null;
@@ -224,7 +252,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
/**
* @return Near node ID.
*/
- UUID nearNodeId() {
+ public UUID nearNodeId() {
return nearNodeId;
}
@@ -334,4 +362,4 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
@Override public String toString() {
return GridToStringBuilder.toString(GridDhtTxRemote.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 3ddf6d3..0202c53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -306,7 +306,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
) {
if (CU.affinityNodes(cctx, topVer).isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid)."));
+ "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
return;
}
@@ -816,4 +816,4 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
return S.toString(MiniFuture.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 65b1d38..d93f68f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -279,7 +279,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
public void map() {
AffinityTopologyVersion topVer = null;
- IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
+ IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(null);
if (tx != null && tx.topologyVersionSnapshot() != null)
topVer = tx.topologyVersionSnapshot();
@@ -1188,4 +1188,4 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
public String toString() {
return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index f7093b8..596ec77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -547,6 +548,14 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
// Obtain the topology version to use.
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
+ // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+ if (topVer == null && tx != null && tx.system()) {
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+ if (tx0 != null)
+ topVer = tx0.topologyVersionSnapshot();
+ }
+
if (topVer != null && tx != null)
tx.topologyVersion(topVer);
@@ -1425,4 +1434,4 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 26276f0..f3e5ca3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -685,6 +686,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
// Obtain the topology version to use.
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+ if (topVer == null && tx != null && tx.system()) {
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+ if (tx0 != null)
+ topVer = tx0.topologyVersionSnapshot();
+ }
+
if (topVer != null && tx != null)
tx.topologyVersion(topVer);
@@ -1566,4 +1575,4 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());
}
}
-}
\ No newline at end of file
+}