You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 08:00:52 UTC
[33/38] ignite git commit: ignite-4768
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d10806f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d10806f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d10806f
Branch: refs/heads/ignite-4768
Commit: 8d10806f40c24cd77ba72430811ff1464be7a01e
Parents: de2697d
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 13 14:50:23 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 13 14:50:23 2017 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 3 +-
.../processors/cache/GridCacheIoManager.java | 4 +-
.../GridDistributedTxFinishRequest.java | 13 ++
.../distributed/dht/GridDhtTxFinishFuture.java | 83 +++++----
.../distributed/dht/GridDhtTxFinishRequest.java | 4 +
.../distributed/dht/GridDhtTxPrepareFuture.java | 14 +-
.../dht/GridDhtTxPrepareRequest.java | 59 +------
.../near/GridNearTxFinishFuture.java | 12 +-
.../near/GridNearTxFinishRequest.java | 2 +
.../cache/transactions/IgniteTxHandler.java | 174 +++++++++++--------
10 files changed, 193 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 5a26187..85e2cbe 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -170,7 +171,7 @@ public class MessageCodeGenerator {
// gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
-// gen.generateAndWrite(GridMessageCollection.class);
+ gen.generateAndWrite(GridDhtTxPrepareRequest.class);
// gen.generateAndWrite(DataStreamerEntry.class);
// gen.generateAndWrite(GridDistributedLockRequest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 71f4e1c..e91bc9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -367,8 +367,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
unmarshall(nodeId, cacheMsg);
-// if (!cacheMsg.partitionExchangeMessage())
-// log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
+ if (!cacheMsg.partitionExchangeMessage())
+ log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
if (cacheMsg.classError() != null)
processFailedMessage(nodeId, cacheMsg, c);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index ab9f0ff..c794f96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -63,6 +63,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** */
protected static final int STORE_ENABLED_FLAG_MASK = 0x20;
+ /** */
+ private static final int DHT_REPLY_NEAR_FLAG_MASK = 0x40;
+
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -138,6 +141,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
@NotNull AffinityTopologyVersion topVer,
@Nullable GridCacheVersion commitVer,
long threadId,
+ boolean dhtReplyNear,
boolean commit,
boolean invalidate,
boolean sys,
@@ -171,6 +175,15 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
this.txSize = txSize;
completedVersions(committedVers, rolledbackVers);
+
+ setFlag(dhtReplyNear, DHT_REPLY_NEAR_FLAG_MASK);
+ }
+
+ /**
+ * @return {@code True} if transaction works in mode when DHT nodes reply directly to near node.
+ */
+ public final boolean dhtReplyNear() {
+ return isFlag(DHT_REPLY_NEAR_FLAG_MASK);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 17e9047..72a9b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -236,7 +236,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
if (finishErr == null)
finishErr = this.tx.commitError();
- if (this.tx.syncMode() != PRIMARY_SYNC)
+ if (this.tx.syncMode() != PRIMARY_SYNC && !this.tx.dhtReplyNear())
this.tx.sendFinishReply(finishErr);
// Don't forget to clean up.
@@ -322,7 +322,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.commitVersion(),
tx.threadId(),
tx.isolation(),
- false,
+ /*dhtReplyNear*/false, // TODO IGNITE-4768.
+ /*commit*/false,
tx.isInvalidate(),
tx.system(),
tx.ioPolicy(),
@@ -387,6 +388,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
boolean sync = tx.syncMode() == FULL_SYNC;
+ boolean dhtReplyNear = tx.dhtReplyNear() && tx.syncMode() == FULL_SYNC;
+
if (tx.explicitLock())
sync = true;
@@ -406,9 +409,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
// Nothing to send.
continue;
- MiniFuture fut = new MiniFuture(++miniId, dhtMapping, nearMapping);
+ MiniFuture fut = null;
- add(fut); // Append new future.
+ if (!dhtReplyNear) {
+ fut = new MiniFuture(++miniId, dhtMapping, nearMapping);
+
+ add(fut); // Append new future.
+ }
Collection<Long> updCntrs = new ArrayList<>(dhtMapping.entries().size());
@@ -417,13 +424,14 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
tx.nearNodeId(),
- futId,
- fut.futureId(),
+ dhtReplyNear ? tx.nearFutureId() : futId,
+ dhtReplyNear ? 0 : fut.futureId(),
tx.topologyVersion(),
tx.xidVersion(),
tx.commitVersion(),
tx.threadId(),
tx.isolation(),
+ dhtReplyNear,
commit,
tx.isInvalidate(),
tx.system(),
@@ -455,22 +463,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
if (sync)
res = true;
- else
+ else if (fut != null)
fut.onDone();
}
catch (IgniteCheckedException e) {
- // Fail the whole thing.
- if (e instanceof ClusterTopologyCheckedException)
- fut.onNodeLeft((ClusterTopologyCheckedException)e);
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + n.id() +
- ", err=" + e + ']');
- }
+ if (fut != null) {
+ if (e instanceof ClusterTopologyCheckedException)
+ fut.onNodeLeft((ClusterTopologyCheckedException)e);
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + n.id() +
+ ", err=" + e + ']');
+ }
- fut.onResult(e);
+ // TODO IGNITE-4768: reply on near with error?
+ fut.onResult(e);
+ }
}
}
}
@@ -481,19 +491,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
// Nothing to send.
continue;
- MiniFuture fut = new MiniFuture(++miniId, null, nearMapping);
+ MiniFuture fut = null;
- add(fut); // Append new future.
+ if (!dhtReplyNear) {
+ fut = new MiniFuture(++miniId, null, nearMapping);
+
+ add(fut); // Append new future.
+ }
GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
tx.nearNodeId(),
futId,
- fut.futureId(),
+ fut != null ? fut.futureId() : -1,
tx.topologyVersion(),
tx.xidVersion(),
tx.commitVersion(),
tx.threadId(),
tx.isolation(),
+ dhtReplyNear,
commit,
tx.isInvalidate(),
tx.system(),
@@ -524,22 +539,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
if (sync)
res = true;
- else
+ else if (fut != null)
fut.onDone();
}
catch (IgniteCheckedException e) {
- // Fail the whole thing.
- if (e instanceof ClusterTopologyCheckedException)
- fut.onNodeLeft((ClusterTopologyCheckedException)e);
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.primary().id() +
- ", err=" + e + ']');
+ if (fut != null) {
+ if (e instanceof ClusterTopologyCheckedException)
+ fut.onNodeLeft((ClusterTopologyCheckedException)e);
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + nearMapping.primary().id() +
+ ", err=" + e + ']');
+ }
+
+ // TODO IGNITE-4768: reply on near with error?
+ fut.onResult(e);
}
-
- fut.onResult(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index d9b3ae7..40f96c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -106,6 +106,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
GridCacheVersion commitVer,
long threadId,
TransactionIsolation isolation,
+ boolean dhtReplyNear,
boolean commit,
boolean invalidate,
boolean sys,
@@ -129,6 +130,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
topVer,
commitVer,
threadId,
+ dhtReplyNear,
commit,
invalidate,
sys,
@@ -190,6 +192,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
GridCacheVersion commitVer,
long threadId,
TransactionIsolation isolation,
+ boolean dhtReplyNear,
boolean commit,
boolean invalidate,
boolean sys,
@@ -216,6 +219,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
commitVer,
threadId,
isolation,
+ dhtReplyNear,
commit,
invalidate,
sys,
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ca028f8..c8e06af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1276,7 +1276,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
MiniFuture fut = null;
- if (!tx.dhtReplyNear()) {
+ if (!dhtReplyNear) {
fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
add(fut); // Append new future.
@@ -1285,10 +1285,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert txNodes != null;
GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
- futId,
- fut != null ? fut.futureId() : 0,
- nearFutId,
- nearMiniId,
+ dhtReplyNear ? nearFutId : futId,
+ dhtReplyNear ? nearMiniId : fut.futureId(),
tx.topologyVersion(),
tx,
timeout,
@@ -1405,10 +1403,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
add(fut); // Append new future.
GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
- futId,
- fut.futureId(),
- nearFutId,
- nearMiniId,
+ dhtReplyNear ? nearFutId : futId,
+ dhtReplyNear ? nearMiniId : fut.futureId(),
tx.topologyVersion(),
tx,
timeout,
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 85a65a8..04a296d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -61,12 +61,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Mini future ID. */
private int miniId;
- /** Future ID. */
- private IgniteUuid nearFutId;
-
- /** Mini future ID. */
- private int nearMiniId;
-
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -113,8 +107,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/**
* @param futId Future ID.
* @param miniId Mini future ID.
- * @param nearFutId Near node future ID.
- * @param nearMiniId Near node mini future ID.
* @param topVer Topology version.
* @param tx Transaction.
* @param timeout Transaction timeout.
@@ -129,8 +121,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
public GridDhtTxPrepareRequest(
IgniteUuid futId,
int miniId,
- IgniteUuid nearFutId,
- int nearMiniId,
AffinityTopologyVersion topVer,
GridDhtTxLocalAdapter tx,
long timeout,
@@ -156,8 +146,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
onePhaseCommit,
addDepInfo);
- assert dhtNearReply || (futId != null && miniId != 0);
- assert !dhtNearReply || (nearFutId != null && nearMiniId != 0);
+ assert futId != null;
+ assert miniId != 0;
this.topVer = topVer;
this.futId = futId;
@@ -166,8 +156,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
this.nearXidVer = nearXidVer;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
- this.nearFutId = nearFutId;
- this.nearMiniId = nearMiniId;
needReturnValue(retVal);
@@ -264,21 +252,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
/**
- * @return Near future ID.
- */
- public IgniteUuid nearFutureId() {
- return nearFutId;
- }
-
- /**
- * @return Near mini future ID.
- */
- public int nearMiniId() {
- return nearMiniId;
- }
-
-
- /**
* @return Topology version.
*/
@Override public AffinityTopologyVersion topologyVersion() {
@@ -395,18 +368,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
writer.incrementState();
- case 23:
- if (!writer.writeIgniteUuid("nearFutId", nearFutId))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeInt("nearMiniId", nearMiniId))
- return false;
-
- writer.incrementState();
-
case 25:
if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
@@ -501,22 +462,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 23:
- nearFutId = reader.readIgniteUuid("nearFutId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- nearMiniId = reader.readInt("nearMiniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 25:
nearNodeId = reader.readUuid("nearNodeId");
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c55d515..44455ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -716,10 +716,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// Version to be added in completed versions on primary node.
GridCacheVersion completedVer = !commit && tx.timeout() > 0 ? tx.xidVersion() : null;
+ boolean dhtReplyNear = tx.dhtReplyNear() && syncMode == FULL_SYNC;
+
GridNearTxFinishRequest req = new GridNearTxFinishRequest(
futId,
tx.xidVersion(),
tx.threadId(),
+ dhtReplyNear,
commit,
tx.isInvalidate(),
tx.system(),
@@ -737,8 +740,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.activeCachesDeploymentEnabled()
);
- boolean dhtReplyNear = tx.dhtReplyNear() && syncMode == FULL_SYNC;
-
// If this is the primary node for the keys.
if (n.isLocal()) {
req.miniId(miniId);
@@ -855,11 +856,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.commitVersion(),
tx.threadId(),
tx.isolation(),
- true,
- false,
+ /*dhtReplyNear*/false,
+ /*commit*/true,
+ /*invalidate*/false,
tx.system(),
tx.ioPolicy(),
- false,
+ /*sysInvalidate*/false,
tx.syncMode(),
null,
null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 05c1f3e..58e75f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -73,6 +73,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
IgniteUuid futId,
GridCacheVersion xidVer,
long threadId,
+ boolean dhtReplyNear,
boolean commit,
boolean invalidate,
boolean sys,
@@ -94,6 +95,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
topVer,
null,
threadId,
+ dhtReplyNear,
commit,
invalidate,
sys,
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 754979c..71b847a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -39,6 +39,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -967,8 +969,8 @@ public class IgniteTxHandler {
nearRes = new GridDhtTxPrepareResponse(
req.partition(),
req.nearXidVersion(),
- req.nearFutureId(),
- req.nearMiniId(),
+ req.futureId(),
+ req.miniId(),
req.deployInfo() != null);
nearRes.nearNodeResponse(true);
@@ -1036,6 +1038,7 @@ public class IgniteTxHandler {
if (nearTx != null)
nearTx.rollback();
+ // TODO IGNITE-4768.
res = new GridDhtTxPrepareResponse(
req.partition(),
req.version(),
@@ -1116,19 +1119,19 @@ public class IgniteTxHandler {
@SuppressWarnings({"unchecked"})
private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
assert nodeId != null;
- assert req != null;
+ assert req.nearNodeId() != null : req;
if (req.checkCommitted()) {
boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
if (!committed || req.syncMode() != FULL_SYNC)
- sendReply(nodeId, req, committed, null);
+ sendCheckCommittedReply(nodeId, req, committed);
else {
IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- sendReply(nodeId, req, true, null);
+ sendCheckCommittedReply(nodeId, req, true);
}
});
}
@@ -1181,16 +1184,16 @@ public class IgniteTxHandler {
if (completeFut != null) {
completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
- sendReply(nodeId, req, true, nearTxId);
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ sendFinishReply(nodeId, req, nearTxId);
}
});
}
else
- sendReply(nodeId, req, true, nearTxId);
+ sendFinishReply(nodeId, req, nearTxId);
}
else
- sendReply(nodeId, req, true, null);
+ sendFinishReply(nodeId, req, null);
assert req.txState() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null) : req;
}
@@ -1350,81 +1353,75 @@ public class IgniteTxHandler {
}
/**
- * Sends tx finish response to remote node, if response is requested.
- *
* @param nodeId Node id that originated finish request.
* @param req Request.
* @param committed {@code True} if transaction committed on this node.
- * @param nearTxId Near tx version.
*/
- private void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
- if (req.replyRequired() || req.checkCommitted()) {
- GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
- req.partition(),
- req.version(),
- req.futureId(),
- req.miniId());
+ private void sendCheckCommittedReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
+ assert req.checkCommitted() : req;
- if (req.checkCommitted()) {
- res.checkCommitted(true);
-
- if (committed) {
- if (req.needReturnValue()) {
- try {
- GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
-
- if (wrapper != null)
- res.returnValue(wrapper.fut().get());
- else
- assert !ctx.discovery().alive(nodeId) : nodeId;
- }
- catch (IgniteCheckedException ignored) {
- if (txFinishMsgLog.isDebugEnabled()) {
- txFinishMsgLog.debug("Failed to gain entry processor return value. [txId=" + nearTxId +
- ", dhtTxId=" + req.version() +
- ", node=" + nodeId + ']');
- }
- }
- }
- }
- else {
- ClusterTopologyCheckedException cause =
- new ClusterTopologyCheckedException("Primary node left grid.");
+ GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId());
- res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
- "(transaction has been rolled back on backup node): " + req.version(), cause));
- }
- }
+ res.checkCommitted(true);
- try {
- ctx.io().send(nodeId, res, req.policy());
+ if (committed) {
+ if (req.needReturnValue()) {
+ try {
+ GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
- if (txFinishMsgLog.isDebugEnabled()) {
- txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId +
- ", dhtTxId=" + req.version() +
- ", node=" + nodeId +
- ", checkCommitted=" + req.checkCommitted() + ']');
+ if (wrapper != null)
+ res.returnValue(wrapper.fut().get());
+ else
+ assert !ctx.discovery().alive(nodeId) : nodeId;
}
- }
- catch (Throwable e) {
- // Double-check.
- if (ctx.discovery().node(nodeId) == null) {
+ catch (IgniteCheckedException ignored) {
if (txFinishMsgLog.isDebugEnabled()) {
- txFinishMsgLog.debug("Node left while send dht tx finish response [txId=" + nearTxId +
+ txFinishMsgLog.debug("Failed to get return value. [txId=null" +
", dhtTxId=" + req.version() +
", node=" + nodeId + ']');
}
}
- else {
- U.error(log, "Failed to send finish response to node [txId=" + nearTxId +
- ", dhtTxId=" + req.version() +
- ", nodeId=" + nodeId +
- ", res=" + res + ']', e);
- }
+ }
+ }
+ else {
+ ClusterTopologyCheckedException cause =
+ new ClusterTopologyCheckedException("Primary node left grid.");
- if (e instanceof Error)
- throw (Error)e;
+ res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
+ "(transaction has been rolled back on backup node): " + req.version(), cause));
+ }
+
+ sendFinishResponse(nodeId, res, req, null);
+ }
+
+ /**
+ * Sends tx finish response to remote node, if response is requested.
+ *
+ * @param nodeId Node id that originated finish request.
+ * @param req Request.
+ * @param nearTxId Near tx version.
+ */
+ private void sendFinishReply(UUID nodeId, GridDhtTxFinishRequest req, GridCacheVersion nearTxId) {
+ assert !req.checkCommitted();
+
+ if (req.replyRequired()) {
+ GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId());
+
+ if (req.dhtReplyNear()) {
+ nodeId = req.nearNodeId();
+
+ res.nearNodeResponse(true);
}
+
+ sendFinishResponse(nodeId, res, req, nearTxId);
}
else {
if (txFinishMsgLog.isDebugEnabled()) {
@@ -1437,6 +1434,47 @@ public class IgniteTxHandler {
/**
* @param nodeId Node ID.
+ * @param res Response.
+ * @param req Request (for debug info logging).
+ * @param nearTxId (for debug info logging).
+ */
+ private void sendFinishResponse(UUID nodeId,
+ GridDistributedTxFinishResponse res,
+ GridDhtTxFinishRequest req,
+ @Nullable GridCacheVersion nearTxId) {
+ try {
+ ctx.io().send(nodeId, res, req.policy());
+
+ if (txFinishMsgLog.isDebugEnabled()) {
+ txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId +
+ ", checkCommitted=" + req.checkCommitted() + ']');
+ }
+ }
+ catch (Throwable e) {
+ // Double-check.
+ if (ctx.discovery().node(nodeId) == null) {
+ if (txFinishMsgLog.isDebugEnabled()) {
+ txFinishMsgLog.debug("Node left while send dht tx finish response [txId=" + nearTxId +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ else {
+ U.error(log, "Failed to send finish response to node [txId=" + nearTxId +
+ ", dhtTxId=" + req.version() +
+ ", nodeId=" + nodeId +
+ ", res=" + res + ']', e);
+ }
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
* @param req Request.
* @param res Response or {@code null} if should not reply to primary.
* @return Remote transaction.