You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 16:35:36 UTC
[44/50] [abbrv] ignite git commit: ignite-4768
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/baeb2036
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/baeb2036
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/baeb2036
Branch: refs/heads/ignite-4768-1
Commit: baeb2036e4f29ef1336cbfe5d8f8fcac012cbfb4
Parents: df80606
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 13 15:31:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 13 18:45:44 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 1 +
.../processors/cache/GridCacheIoManager.java | 4 +-
.../processors/cache/GridCacheMvccManager.java | 7 ++
.../distributed/GridCacheTxFinishSync.java | 6 +-
.../GridCacheTxRecoveryResponse.java | 2 +
.../GridDistributedTxFinishRequest.java | 8 ++
.../GridDistributedTxPrepareRequest.java | 13 ++-
.../GridDistributedTxPrepareResponse.java | 1 +
.../distributed/dht/GridDhtTxFinishFuture.java | 7 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 64 ++----------
.../distributed/dht/GridDhtTxLocalAdapter.java | 65 ++++++++++--
.../distributed/dht/GridDhtTxPrepareFuture.java | 37 +++----
...arOptimisticSerializableTxPrepareFuture.java | 100 ++++++++++++++++---
.../GridNearPessimisticTxPrepareFuture.java | 12 +--
.../near/GridNearTxFinishFuture.java | 5 +-
.../cache/distributed/near/GridNearTxLocal.java | 10 +-
.../cache/transactions/IgniteTxHandler.java | 19 ++--
17 files changed, 215 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e..0ee02fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1155,6 +1155,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
String msg = NL +
"Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
+ "Futures " + ctx.cache().context().mvcc().activeFuturesCount()+ NL +
" ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL +
" ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
" ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2c255a5..e433825 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -367,8 +367,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
unmarshall(nodeId, cacheMsg);
- if (!cacheMsg.partitionExchangeMessage())
- log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
+// if (!cacheMsg.partitionExchangeMessage())
+// log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
if (cacheMsg.classError() != null)
processFailedMessage(nodeId, cacheMsg, c);
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 4ec13fc..7d14fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -312,6 +312,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * TODO IGNITE-4768.
+ */
+ public int activeFuturesCount() {
+ return mvccFuts.size();
+ }
+
+ /**
* @param leftNodeId Left node ID.
* @param topVer Topology version.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
index 1e323d0..64d3122 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
@@ -137,7 +137,7 @@ public class GridCacheTxFinishSync<K, V> {
/**
* @param nodeId Node ID request being sent to.
*/
- public void onSend(UUID nodeId) {
+ void onSend(UUID nodeId) {
TxFinishSync sync = nodeMap.get(nodeId);
if (sync == null) {
@@ -169,7 +169,7 @@ public class GridCacheTxFinishSync<K, V> {
* @param nodeId Node ID to wait ack from.
* @return {@code null} if ack has been received or future that will be completed when ack is received.
*/
- public IgniteInternalFuture<?> awaitAckAsync(UUID nodeId) {
+ IgniteInternalFuture<?> awaitAckAsync(UUID nodeId) {
TxFinishSync sync = nodeMap.get(nodeId);
if (sync == null)
@@ -191,7 +191,7 @@ public class GridCacheTxFinishSync<K, V> {
/**
* @param nodeId Node ID response received from.
*/
- public void onReceive(UUID nodeId) {
+ void onReceive(UUID nodeId) {
TxFinishSync sync = nodeMap.get(nodeId);
if (sync != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
index c087a3d..b5bb1b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -48,6 +49,7 @@ public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage impl
/** Transient TX state. */
@GridDirectTransient
+ @GridToStringExclude
private IgniteTxState txState;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index c794f96..03d16e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -110,6 +111,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** Transient TX state. */
@GridDirectTransient
+ @GridToStringExclude
private IgniteTxState txState;
/**
@@ -571,7 +573,13 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** {@inheritDoc} */
@Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (dhtReplyNear())
+ appendFlag(flags, "dht2near");
+
return GridToStringBuilder.toString(GridDistributedTxFinishRequest.class, this,
+ "flags", flags.toString(),
"super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index c013c1a..1f06696 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -151,6 +151,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** Transient TX state. */
@GridDirectTransient
+ @GridToStringExclude
private IgniteTxState txState;
/** */
@@ -699,15 +700,17 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
StringBuilder flags = new StringBuilder();
if (needReturnValue())
- flags.append("retVal");
+ appendFlag(flags, "retVal");
if (isInvalidate())
- flags.append("invalidate");
+ appendFlag(flags, "invalidate");
if (onePhaseCommit())
- flags.append("onePhase");
+ appendFlag(flags, "onePhase");
if (last())
- flags.append("last");
+ appendFlag(flags, "last");
if (system())
- flags.append("sys");
+ appendFlag(flags, "sys");
+ if (dhtReplyNear())
+ appendFlag(flags, "dht2near");
return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
"flags", flags.toString(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 53a1391..aaa8db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -49,6 +49,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
/** Transient TX state. */
@GridDirectTransient
+ @GridToStringExclude
private IgniteTxState txState;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 72a9b73..eb5d58f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -100,6 +100,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter tx, boolean commit) {
super(F.<IgniteInternalTx>identityReducer(tx));
+ assert tx.nearFinishFutureId() != null : tx;
+ assert tx.nearFinishMiniId() != 0 : tx;
+
this.cctx = cctx;
this.tx = tx;
this.commit = commit;
@@ -424,8 +427,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
tx.nearNodeId(),
- dhtReplyNear ? tx.nearFutureId() : futId,
- dhtReplyNear ? 0 : fut.futureId(),
+ dhtReplyNear ? tx.nearFinishFutureId() : futId,
+ dhtReplyNear ? tx.nearFinishMiniId() : fut.futureId(),
tx.topologyVersion(),
tx.xidVersion(),
tx.commitVersion(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index bff69bc..7ddf415 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -68,18 +68,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
/** */
private UUID nearNodeId;
- /** Near future ID. */
- private IgniteUuid nearFutId;
-
- /** Near future ID. */
- private int nearMiniId;
-
- /** Near future ID. */
- private IgniteUuid nearFinFutId;
-
- /** Near future ID. */
- private int nearFinMiniId;
-
/** Near XID. */
private GridCacheVersion nearXidVer;
@@ -164,8 +152,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
this.nearNodeId = nearNodeId;
this.nearXidVer = nearXidVer;
- this.nearFutId = nearFutId;
- this.nearMiniId = nearMiniId;
+ this.nearPrepFutId = nearFutId;
+ this.nearPrepMiniId = nearMiniId;
this.txNodes = txNodes;
threadId = nearThreadId;
@@ -219,18 +207,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/** {@inheritDoc} */
- @Override protected IgniteUuid nearFutureId() {
- return nearFutId;
- }
-
- /**
- * @param nearFutId Near future ID.
- */
- public void nearFutureId(IgniteUuid nearFutId) {
- this.nearFutId = nearFutId;
- }
-
- /** {@inheritDoc} */
@Override public boolean dht() {
return true;
}
@@ -240,27 +216,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId());
}
- /**
- * @return Near future ID.
- */
- public IgniteUuid nearFinishFutureId() {
- return nearFinFutId;
- }
-
- /**
- * @param nearFinFutId Near future ID.
- */
- public void nearFinishFutureId(IgniteUuid nearFinFutId) {
- this.nearFinFutId = nearFinFutId;
- }
-
- /**
- * @param nearFinMiniId Near future mini ID.
- */
- public void nearFinishMiniId(int nearFinMiniId) {
- this.nearFinMiniId = nearFinMiniId;
- }
-
/** {@inheritDoc} */
@Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached,
IgniteTxEntry entry, AffinityTopologyVersion topVer) {
@@ -301,7 +256,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
null,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
0,
- nearMiniId,
null,
true);
}
@@ -317,7 +271,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
cctx,
this,
timeout,
- nearMiniId,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
true,
needReturnValue()))) {
@@ -377,7 +330,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
* @param writes Write entries.
* @param verMap Version map.
* @param msgId Message ID.
- * @param nearMiniId Near mini future ID.
* @param txNodes Transaction nodes mapping.
* @param last {@code True} if this is last prepare request.
* @return Future that will be completed when locks are acquired.
@@ -387,7 +339,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
@Nullable Collection<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
long msgId,
- int nearMiniId,
Map<UUID, Collection<UUID>> txNodes,
boolean last
) {
@@ -404,14 +355,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
cctx,
this,
timeout,
- nearMiniId,
verMap,
last,
needReturnValue()))) {
GridDhtTxPrepareFuture f = prepFut;
- assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
- "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
+ assert f.nearMiniId() == nearPrepMiniId : "Wrong near mini id on existing future " +
+ "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearPrepMiniId + ", fut=" + f + ']';
if (timeout == -1)
f.onError(timeoutException());
@@ -420,8 +370,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
else {
- assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
- "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
+ assert fut.nearMiniId() == nearPrepMiniId : "Wrong near mini id on existing future " +
+ "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearPrepMiniId + ", fut=" + fut + ']';
// Prepare was called explicitly.
return chainOnePhasePrepare(fut);
@@ -619,7 +569,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
"Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit +
", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']';
- assert nearMiniId != 0;
+ assert nearPrepMiniId != 0;
return super.finish(commit);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 67e1993..81b5208 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -97,6 +97,18 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/** Nodes where transactions were started on lock step. */
private Set<ClusterNode> lockTxNodes;
+ /** Near future ID. */
+ protected IgniteUuid nearPrepFutId;
+
+ /** Prepare future mini ID. */
+ protected int nearPrepMiniId;
+
+ /** Near future ID. */
+ protected IgniteUuid nearFinFutId;
+
+ /** Prepare future mini ID. */
+ protected int nearFinMiniId;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -159,9 +171,55 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
}
/**
+ * @return Near future ID.
+ */
+ final IgniteUuid nearPrepareFutureId() {
+ return nearPrepFutId;
+ }
+
+ /**
+ * @param futId Near future ID.
+ * @param miniId Near mini future ID.
+ */
+ public final void nearPrepareFutureId(IgniteUuid futId, int miniId) {
+ this.nearPrepFutId = futId;
+ this.nearPrepMiniId = miniId;
+ }
+
+ /**
+ * @return Near prepare mini future ID.
+ */
+ final int nearPrepareMiniId() {
+ return nearPrepMiniId;
+ }
+
+ /**
+ * @return Near future ID.
+ */
+ final IgniteUuid nearFinishFutureId() {
+ return nearFinFutId;
+ }
+
+ /**
+ * @param futId Near future ID.
+ * @param miniId Near mini future ID.
+ */
+ public final void nearFinishFutureId(IgniteUuid futId, int miniId) {
+ nearFinFutId = futId;
+ nearFinMiniId = miniId;
+ }
+
+ /**
+ * @return Near future mini ID.
+ */
+ public final int nearFinishMiniId() {
+ return nearFinMiniId;
+ }
+
+ /**
* @param node Node.
*/
- public void addLockTransactionNode(ClusterNode node) {
+ void addLockTransactionNode(ClusterNode node) {
assert node != null;
assert !node.isLocal();
@@ -216,11 +274,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
protected abstract UUID nearNodeId();
/**
- * @return Near future ID.
- */
- protected abstract IgniteUuid nearFutureId();
-
- /**
* Adds reader to cached entry.
*
* @param msgId Message ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index c8e06af..7c69021 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -175,9 +175,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** Trackable flag. */
private boolean trackable = true;
- /** Near mini future id. */
- private int nearMiniId;
-
/** DHT versions map. */
private Map<IgniteTxKey, GridCacheVersion> dhtVerMap;
@@ -213,7 +210,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param cctx Context.
* @param tx Transaction.
* @param timeout Timeout.
- * @param nearMiniId Near mini future id.
* @param dhtVerMap DHT versions map.
* @param last {@code True} if this is last prepare operation for node.
* @param retVal Return value flag.
@@ -222,13 +218,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridCacheSharedContext cctx,
final GridDhtTxLocalAdapter tx,
long timeout,
- int nearMiniId,
Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
boolean last,
boolean retVal
) {
super(REDUCER);
+ assert tx.nearPrepareFutureId() != null;
+ assert tx.nearPrepareMiniId() != 0;
+
this.cctx = cctx;
this.tx = tx;
this.dhtVerMap = dhtVerMap;
@@ -236,8 +234,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
futId = IgniteUuid.randomUuid();
- this.nearMiniId = nearMiniId;
-
if (log == null) {
msgLog = cctx.txPrepareMessageLogger();
log = U.logger(cctx.kernalContext(), logRef, GridDhtTxPrepareFuture.class);
@@ -263,7 +259,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return Near mini future id.
*/
int nearMiniId() {
- return nearMiniId;
+ return tx.nearPrepareMiniId();
}
/** {@inheritDoc} */
@@ -860,8 +856,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
-1,
tx.nearXidVersion(),
- tx.colocated() ? tx.xid() : tx.nearFutureId(),
- nearMiniId,
+ tx.nearPrepareFutureId(),
+ nearMiniId(),
tx.xidVersion(),
tx.writeVersion(),
ret,
@@ -1223,17 +1219,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
final boolean dhtReplyNear = tx.dhtReplyNear();
- Collection<UUID> backupNodes;
- IgniteUuid nearFutId;
-
- if (dhtReplyNear) {
- backupNodes = tx.transactionNodes().get(cctx.localNodeId());
- nearFutId = tx.colocated() ? tx.xid() : tx.nearFutureId();
- }
- else {
- backupNodes = null;
- nearFutId = null;
- }
+ Collection<UUID> backupNodes = dhtReplyNear ? tx.transactionNodes().get(cctx.localNodeId()) : null;
// Assign keys to primary nodes.
if (!F.isEmpty(writes)) {
@@ -1285,8 +1271,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert txNodes != null;
GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
- dhtReplyNear ? nearFutId : futId,
- dhtReplyNear ? nearMiniId : fut.futureId(),
+ dhtReplyNear ? tx.nearPrepareFutureId() : futId,
+ dhtReplyNear ? tx.nearPrepareMiniId() : fut.futureId(),
tx.topologyVersion(),
tx,
timeout,
@@ -1403,8 +1389,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
add(fut); // Append new future.
GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
- dhtReplyNear ? nearFutId : futId,
- dhtReplyNear ? nearMiniId : fut.futureId(),
+ dhtReplyNear ? tx.nearPrepareFutureId() : futId,
+ dhtReplyNear ? tx.nearPrepareMiniId() : fut.futureId(),
tx.topologyVersion(),
tx,
timeout,
@@ -1503,6 +1489,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param entry Transaction entry.
+ * @param backupNodes Node IDs collection if tx was mapped on near node.
*/
private void map(IgniteTxEntry entry, @Nullable Collection<UUID> backupNodes) {
if (entry.cached().isLocal())
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index cd0e7fd..6b2daf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
@@ -36,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -200,13 +203,18 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture mini = miniFuture(res.miniId());
if (mini != null)
- mini.onResult(res);
+ mini.onPrimaryResponse(res);
}
}
/** {@inheritDoc} */
@Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
- assert false; // TODO IGNITE-4768.
+ assert res.nearNodeResponse() : res;
+
+ MiniFuture mini = miniFuture(res.miniId());
+
+ if (mini != null)
+ mini.onDhtResponse(nodeId, res);
}
/** {@inheritDoc} */
@@ -347,11 +355,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+ boolean dhtReplyNear = true;
+
for (IgniteTxEntry write : writes)
- map(write, topVer, mappings, txMapping, remap, topLocked);
+ dhtReplyNear = map(write, topVer, mappings, txMapping, remap, topLocked, dhtReplyNear);
for (IgniteTxEntry read : reads)
- map(read, topVer, mappings, txMapping, remap, topLocked);
+ dhtReplyNear = map(read, topVer, mappings, txMapping, remap, topLocked, dhtReplyNear);
if (keyLockFut != null)
keyLockFut.onAllKeysAdded();
@@ -371,10 +381,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
checkOnePhase(txMapping);
+ if (tx.onePhaseCommit())
+ dhtReplyNear = false;
+
+ tx.dhtReplyNear(dhtReplyNear);
+
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
- add(new MiniFuture(this, m, ++miniId));
+ Set<UUID> dhtNodes;
+
+ if (dhtReplyNear) {
+ dhtNodes = new HashSet<>(txMapping.transactionNodes().get(m.primary().id()));
+
+ assert !dhtNodes.isEmpty();
+ }
+ else
+ dhtNodes = null;
+
+ add(new MiniFuture(this, m, ++miniId, dhtNodes));
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -389,7 +414,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture fut = (MiniFuture)fut0;
- IgniteCheckedException err = prepare(fut, txMapping);
+ IgniteCheckedException err = prepare(fut, txMapping, dhtReplyNear);
if (err != null) {
while (it.hasNext()) {
@@ -425,7 +450,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param fut Mini future.
* @return Prepare error if any.
*/
- @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) {
+ @Nullable private IgniteCheckedException prepare(final MiniFuture fut,
+ GridDhtTxMapping txMapping,
+ boolean dhtReplyNear) {
GridDistributedTxMapping m = fut.mapping();
final ClusterNode primary = m.primary();
@@ -449,7 +476,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
m.writes(),
m.near(),
txMapping.transactionNodes(),
- false,
+ dhtReplyNear,
m.last(),
tx.onePhaseCommit(),
tx.needReturnValue() && tx.implicit(),
@@ -490,7 +517,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
@Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
try {
- fut.onResult(prepFut.get());
+ fut.onPrimaryResponse(prepFut.get());
}
catch (IgniteCheckedException e) {
fut.onResult(e);
@@ -526,19 +553,30 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param remap Remap flag.
* @param topLocked Topology locked flag.
*/
- private void map(
+ private boolean map(
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
GridDhtTxMapping txMapping,
boolean remap,
- boolean topLocked
+ boolean topLocked,
+ boolean dhtReplyNear
) {
GridCacheContext cacheCtx = entry.context();
- List<ClusterNode> nodes = cacheCtx.isLocal() ?
- cacheCtx.affinity().nodesByKey(entry.key(), topVer) :
- cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
+ List<ClusterNode> nodes;
+
+ if (!cacheCtx.isLocal()) {
+ GridDhtPartitionTopology top = cacheCtx.topology();
+
+ nodes = top.nodes(cacheCtx.affinity().partition(entry.key()), topVer);
+
+ if (dhtReplyNear &&
+ (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) || nodes.size() == 1))
+ dhtReplyNear = false;
+ }
+ else
+ nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
txMapping.addMapping(nodes);
@@ -620,6 +658,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
}
}
+
+ return dhtReplyNear;
}
/** {@inheritDoc} */
@@ -711,15 +751,24 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
@SuppressWarnings("UnusedDeclaration")
private volatile int rcvRes;
+ /** */
+ private final Set<UUID> dhtNodes;
+
/**
* @param parent Parent future.
* @param m Mapping.
* @param futId Mini future ID.
*/
- MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m, int futId) {
+ MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent,
+ GridDistributedTxMapping m,
+ int futId,
+ Set<UUID> dhtNodes) {
+ assert dhtNodes == null || !dhtNodes.isEmpty();
+
this.parent = parent;
this.m = m;
this.futId = futId;
+ this.dhtNodes = dhtNodes;
}
/**
@@ -779,10 +828,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
+ assert dhtNodes != null;
+
+ boolean done;
+
+ synchronized (dhtNodes) {
+ done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty();
+ }
+
+ if (done)
+ onDone();
+ }
+
+ /**
* @param res Result callback.
*/
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- void onResult(final GridNearTxPrepareResponse res) {
+ void onPrimaryResponse(final GridNearTxPrepareResponse res) {
if (isDone())
return;
@@ -885,6 +951,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
onDone(res);
}
else {
+ assert dhtNodes == null;
+
parent.processPrimaryPrepareResponse(m, res);
// Finish this mini future (need result only on client node).
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 1d3eaec..857f237 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -104,7 +104,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (f != null) {
assert f.primary().id().equals(nodeId);
- f.onPrimaryResult(res);
+ f.onPrimaryResponse(res);
}
else {
if (msgLog.isDebugEnabled()) {
@@ -223,12 +223,10 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (!cacheCtx.isLocal()) {
GridDhtPartitionTopology top = cacheCtx.topology();
- if (dhtReplyNear && (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer)))
- dhtReplyNear = false;
-
nodes = top.nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
- if (nodes.size() == 1)
+ if (dhtReplyNear &&
+ (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) || nodes.size() == 1))
dhtReplyNear = false;
}
else
@@ -319,7 +317,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
@Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
try {
- fut.onPrimaryResult(prepFut.get());
+ fut.onPrimaryResponse(prepFut.get());
}
catch (IgniteCheckedException e) {
fut.onError(e);
@@ -466,7 +464,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/**
* @param res Response.
*/
- void onPrimaryResult(GridNearTxPrepareResponse res) {
+ void onPrimaryResponse(GridNearTxPrepareResponse res) {
if (res.error() != null)
onError(res.error());
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 44455ca..56b7284 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -1029,8 +1029,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty();
}
- if (done)
+ if (done) {
+ cctx.tm().onFinishedRemote(primary().id(), tx.threadId());
+
onDone(tx);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8ed749c..80b93e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -214,13 +214,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override protected IgniteUuid nearFutureId() {
- assert false : "nearFutureId should not be called for colocated transactions.";
-
- return null;
- }
-
- /** {@inheritDoc} */
@Override protected IgniteInternalFuture<Boolean> addReader(
long msgId,
GridDhtCacheEntry cached,
@@ -1002,7 +995,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx,
this,
timeout,
- 0,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
last,
needReturnValue() && implicit());
@@ -1065,7 +1057,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
// Do not create finish future if there are no remote nodes.
if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
if (prep != null)
- return (IgniteInternalFuture<IgniteInternalTx>)(IgniteInternalFuture)prep;
+ return (IgniteInternalFuture<IgniteInternalTx>)prep;
return new GridFinishedFuture<IgniteInternalTx>(this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 71b847a..aad0e34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -356,10 +356,6 @@ public class IgniteTxHandler {
if (tx == null)
U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version()
+ ", mappedVer=" + mappedVer + ']');
- else {
- if (req.concurrency() == PESSIMISTIC)
- tx.nearFutureId(req.futureId());
- }
}
else {
GridDhtPartitionTopology top = null;
@@ -470,6 +466,7 @@ public class IgniteTxHandler {
tx.explicitLock(true);
tx.transactionNodes(req.transactionNodes());
+ tx.nearPrepareFutureId(req.futureId(), req.miniId());
tx.dhtReplyNear(req.dhtReplyNear());
@@ -490,7 +487,6 @@ public class IgniteTxHandler {
req.writes(),
req.dhtVersions(),
req.messageId(),
- req.miniId(),
req.transactionNodes(),
req.last());
@@ -771,8 +767,10 @@ public class IgniteTxHandler {
* @param req Finish request.
* @return Finish future.
*/
- private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal locTx,
- GridNearTxFinishRequest req) {
+ private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId,
+ @Nullable GridNearTxLocal locTx,
+ GridNearTxFinishRequest req)
+ {
GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
GridDhtTxLocal tx = null;
@@ -853,6 +851,7 @@ public class IgniteTxHandler {
assert req.syncMode() != null : req;
tx.syncMode(req.syncMode());
+ tx.nearFinishFutureId(req.futureId(), req.miniId());
if (req.commit()) {
tx.storeEnabled(req.storeEnabled());
@@ -864,9 +863,6 @@ public class IgniteTxHandler {
return null;
}
- tx.nearFinishFutureId(req.futureId());
- tx.nearFinishMiniId(req.miniId());
-
IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync();
// Only for error logging.
@@ -875,9 +871,6 @@ public class IgniteTxHandler {
return commitFut;
}
else {
- tx.nearFinishFutureId(req.futureId());
- tx.nearFinishMiniId(req.miniId());
-
IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
// Only for error logging.