You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 08:00:40 UTC
[21/38] ignite git commit: ignite-4768
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/763f1b0b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/763f1b0b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/763f1b0b
Branch: refs/heads/ignite-4768
Commit: 763f1b0b950737f00c2356fa9018f4c15bdef242
Parents: eeb10b8
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 10 16:54:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 10 16:54:37 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 2 +
.../GridDistributedTxFinishRequest.java | 217 ++++++++++++----
.../GridDistributedTxPrepareResponse.java | 76 +++++-
.../distributed/dht/GridDhtTxFinishFuture.java | 33 +--
.../distributed/dht/GridDhtTxFinishRequest.java | 251 ++++++-------------
.../dht/GridDhtTxFinishResponse.java | 12 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 18 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 1 +
.../dht/GridDhtTxPrepareResponse.java | 58 +++--
.../near/GridNearTxFinishFuture.java | 71 ++++--
.../near/GridNearTxFinishRequest.java | 174 ++-----------
.../near/GridNearTxFinishResponse.java | 12 +-
.../near/GridNearTxPrepareResponse.java | 47 ++--
.../cache/transactions/IgniteTxHandler.java | 33 ++-
14 files changed, 501 insertions(+), 504 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 7cac367..71f4e1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -547,6 +547,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -672,6 +673,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 3e47cc9..ab9f0ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,18 +20,24 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
/**
* Transaction completion message.
*/
@@ -39,6 +45,27 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ protected static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
+ /** */
+ protected static final int CHECK_COMMITTED_FLAG_MASK = 0x02;
+
+ /** */
+ protected static final int NEED_RETURN_VALUE_FLAG_MASK = 0x04;
+
+ /** */
+ protected static final int SYS_INVALIDATE_FLAG_MASK = 0x08;
+
+ /** */
+ protected static final int EXPLICIT_LOCK_FLAG_MASK = 0x10;
+
+ /** */
+ protected static final int STORE_ENABLED_FLAG_MASK = 0x20;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
/** Future ID. */
private IgniteUuid futId;
@@ -54,14 +81,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** Commit flag. */
private boolean commit;
- /** Sync commit flag. */
- @Deprecated
- private boolean syncCommit;
-
- /** Sync commit flag. */
- @Deprecated
- private boolean syncRollback;
-
/** Min version used as base for completed versions. */
private GridCacheVersion baseVer;
@@ -74,6 +93,18 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** IO policy. */
private byte plc;
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** */
+ private byte flags;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
/** Transient TX state. */
@GridDirectTransient
private IgniteTxState txState;
@@ -94,8 +125,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
* @param invalidate Invalidate flag.
* @param sys System transaction flag.
* @param plc IO policy.
- * @param syncCommit Sync commit flag.
- * @param syncRollback Sync rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -105,39 +135,93 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
public GridDistributedTxFinishRequest(
GridCacheVersion xidVer,
IgniteUuid futId,
+ @NotNull AffinityTopologyVersion topVer,
@Nullable GridCacheVersion commitVer,
long threadId,
boolean commit,
boolean invalidate,
boolean sys,
byte plc,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
+ @Nullable UUID subjId,
+ int taskNameHash,
int txSize,
boolean addDepInfo
) {
super(xidVer, 0, addDepInfo);
+
assert xidVer != null;
+ assert syncMode != null;
this.futId = futId;
+ this.topVer = topVer;
this.commitVer = commitVer;
this.threadId = threadId;
this.commit = commit;
this.invalidate = invalidate;
this.sys = sys;
this.plc = plc;
- this.syncCommit = syncCommit;
- this.syncRollback = syncRollback;
+ this.syncMode = syncMode;
this.baseVer = baseVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
this.txSize = txSize;
completedVersions(committedVers, rolledbackVers);
}
/**
+ * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ */
+ public final CacheWriteSynchronizationMode syncMode() {
+ return syncMode;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ @Nullable public final UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Task name hash.
+ */
+ public final int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public final AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
* @return System transaction flag.
*/
public boolean system() {
@@ -188,27 +272,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
}
/**
- * @return Sync commit flag.
- */
- public boolean syncCommit() {
- return syncCommit;
- }
-
- /**
- * @param syncCommit Sync commit flag.
- */
- public void syncCommit(boolean syncCommit) {
- this.syncCommit = syncCommit;
- }
-
- /**
- * @return Sync rollback flag.
- */
- public boolean syncRollback() {
- return syncRollback;
- }
-
- /**
* @return Base version.
*/
public GridCacheVersion baseVersion() {
@@ -227,7 +290,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
* @return {@code True} if reply is required.
*/
public boolean replyRequired() {
- return commit ? syncCommit : syncRollback;
+ assert syncMode != null;
+
+ return syncMode == FULL_SYNC;
}
/** {@inheritDoc} */
@@ -279,48 +344,66 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
writer.incrementState();
case 10:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("invalidate", invalidate))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 12:
- if (!writer.writeByte("plc", plc))
+ if (!writer.writeBoolean("invalidate", invalidate))
return false;
writer.incrementState();
case 13:
- if (!writer.writeBoolean("syncCommit", syncCommit))
+ if (!writer.writeByte("plc", plc))
return false;
writer.incrementState();
case 14:
- if (!writer.writeBoolean("syncRollback", syncRollback))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 15:
- if (!writer.writeBoolean("sys", sys))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 16:
- if (!writer.writeLong("threadId", threadId))
+ if (!writer.writeBoolean("sys", sys))
return false;
writer.incrementState();
case 17:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeLong("threadId", threadId))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
if (!writer.writeInt("txSize", txSize))
return false;
@@ -367,7 +450,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 10:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -375,7 +458,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 11:
- invalidate = reader.readBoolean("invalidate");
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -383,7 +466,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 12:
- plc = reader.readByte("plc");
+ invalidate = reader.readBoolean("invalidate");
if (!reader.isLastRead())
return false;
@@ -391,7 +474,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 13:
- syncCommit = reader.readBoolean("syncCommit");
+ plc = reader.readByte("plc");
if (!reader.isLastRead())
return false;
@@ -399,7 +482,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 14:
- syncRollback = reader.readBoolean("syncRollback");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -407,15 +490,19 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 15:
- sys = reader.readBoolean("sys");
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
if (!reader.isLastRead())
return false;
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
reader.incrementState();
case 16:
- threadId = reader.readLong("threadId");
+ sys = reader.readBoolean("sys");
if (!reader.isLastRead())
return false;
@@ -423,6 +510,30 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 17:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ threadId = reader.readLong("threadId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
txSize = reader.readInt("txSize");
if (!reader.isLastRead())
@@ -442,7 +553,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 18;
+ return 21;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 76a5e31..53a1391 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -51,6 +51,12 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
@GridDirectTransient
private IgniteTxState txState;
+ /** */
+ private int part;
+
+ /** */
+ private byte flags;
+
/**
* Empty constructor (required by {@link Externalizable}).
*/
@@ -59,24 +65,54 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
}
/**
- * @param xid Transaction ID.
+ * @param part Partition.
+ * @param xid Lock or transaction ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDistributedTxPrepareResponse(GridCacheVersion xid, boolean addDepInfo) {
+ public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, boolean addDepInfo) {
super(xid, 0, addDepInfo);
+
+ this.part = part;
}
/**
- * @param xid Lock ID.
+ * @param part Partition.
+ * @param xid Lock or transaction ID.
* @param err Error.
* @param addDepInfo Deployment info flag.
*/
- public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable err, boolean addDepInfo) {
+ public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, Throwable err, boolean addDepInfo) {
super(xid, 0, addDepInfo);
+ this.part = part;
this.err = err;
}
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return part;
+ }
+
/** {@inheritDoc} */
@Override public Throwable error() {
return err;
@@ -106,8 +142,6 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
this.txState = txState;
}
- /** {@inheritDoc}
- * @param ctx*/
/** {@inheritDoc} */
@Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
return ctx.txPrepareMessageLogger();
@@ -150,6 +184,18 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
writer.incrementState();
+ case 8:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -174,6 +220,22 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
reader.incrementState();
+ case 8:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDistributedTxPrepareResponse.class);
@@ -186,7 +248,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5d4e610..17e9047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -179,7 +179,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
- if (f.futureId().equals(res.miniId())) {
+ if (f.futureId() == res.miniId()) {
found = true;
assert f.node().id().equals(nodeId);
@@ -304,10 +304,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
boolean res = false;
+ int miniId = 0;
+
for (ClusterNode n : nodes) {
assert !n.isLocal();
- MiniFuture fut = new MiniFuture(n);
+ MiniFuture fut = new MiniFuture(++miniId, n);
add(fut); // Append new future.
@@ -325,8 +327,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -391,6 +392,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
boolean res = false;
+ int miniId = 0;
+
// Create mini futures.
for (GridDistributedTxMapping dhtMapping : dhtMap.values()) {
ClusterNode n = dhtMapping.primary();
@@ -403,7 +406,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
// Nothing to send.
continue;
- MiniFuture fut = new MiniFuture(dhtMapping, nearMapping);
+ MiniFuture fut = new MiniFuture(++miniId, dhtMapping, nearMapping);
add(fut); // Append new future.
@@ -426,8 +429,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -479,7 +481,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
// Nothing to send.
continue;
- MiniFuture fut = new MiniFuture(null, nearMapping);
+ MiniFuture fut = new MiniFuture(++miniId, null, nearMapping);
add(fut); // Append new future.
@@ -497,8 +499,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -573,7 +574,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** DHT mapping. */
@GridToStringInclude
@@ -588,19 +589,23 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
private ClusterNode node;
/**
+ * @param futId Future ID.
* @param node Node.
*/
- private MiniFuture(ClusterNode node) {
+ private MiniFuture(int futId, ClusterNode node) {
+ this.futId = futId;
this.node = node;
}
/**
+ * @param futId Future ID.
* @param dhtMapping Mapping.
* @param nearMapping nearMapping.
*/
- MiniFuture(GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) {
+ MiniFuture(int futId, GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) {
assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
+ this.futId = futId;
this.dhtMapping = dhtMapping;
this.nearMapping = nearMapping;
}
@@ -608,7 +613,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index c618a18..d9b3ae7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
@@ -43,12 +44,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
private static final long serialVersionUID = 0L;
- /** */
- public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
-
- /** */
- public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
-
/** Near node ID. */
private UUID nearNodeId;
@@ -56,22 +51,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
private TransactionIsolation isolation;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** System invalidation flag. */
- private boolean sysInvalidate;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
+ private int miniId;
/** Pending versions with order less than one for this message (needed for commit ordering). */
@GridToStringInclude
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> pendingVers;
- /** Check committed flag. */
- private boolean checkCommitted;
-
/** Partition update counter. */
@GridToStringInclude
@GridDirectCollection(Long.class)
@@ -80,15 +66,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** One phase commit write version. */
private GridCacheVersion writeVer;
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** */
- private byte flags;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -110,8 +87,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param sys System flag.
* @param plc IO policy.
* @param sysInvalidate System invalidation flag.
- * @param syncCommit Synchronous commit flag.
- * @param syncRollback Synchronous rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -124,7 +100,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
public GridDhtTxFinishRequest(
UUID nearNodeId,
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
@NotNull AffinityTopologyVersion topVer,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
@@ -135,8 +111,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean sys,
byte plc,
boolean sysInvalidate,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
@@ -151,35 +126,34 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
commitVer,
threadId,
commit,
invalidate,
sys,
plc,
- syncCommit,
- syncRollback,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo);
- assert miniId != null;
+ assert miniId != 0;
assert nearNodeId != null;
assert isolation != null;
this.pendingVers = pendingVers;
- this.topVer = topVer;
this.nearNodeId = nearNodeId;
this.isolation = isolation;
this.miniId = miniId;
- this.sysInvalidate = sysInvalidate;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
needReturnValue(retVal);
waitRemoteTransactions(waitRemoteTxs);
+ systemInvalidate(sysInvalidate);
}
/**
@@ -196,8 +170,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param sys System flag.
* @param plc IO policy.
* @param sysInvalidate System invalidation flag.
- * @param syncCommit Synchronous commit flag.
- * @param syncRollback Synchronous rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -211,7 +184,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
public GridDhtTxFinishRequest(
UUID nearNodeId,
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
@NotNull AffinityTopologyVersion topVer,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
@@ -222,8 +195,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean sys,
byte plc,
boolean sysInvalidate,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
@@ -236,9 +208,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean retVal,
boolean waitRemoteTxs
) {
- this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
- sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
- subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
+ this(nearNodeId,
+ futId,
+ miniId,
+ topVer,
+ xidVer,
+ commitVer,
+ threadId,
+ isolation,
+ commit,
+ invalidate,
+ sys,
+ plc,
+ sysInvalidate,
+ syncMode,
+ baseVer,
+ committedVers,
+ rolledbackVers,
+ pendingVers,
+ txSize,
+ subjId,
+ taskNameHash,
+ addDepInfo,
+ retVal,
+ waitRemoteTxs);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -258,25 +251,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/**
* @return Mini ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
- * @return Subject ID.
- */
- @Nullable public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
* @return Transaction isolation.
*/
public TransactionIsolation isolation() {
@@ -294,7 +273,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @return System invalidate flag.
*/
public boolean isSystemInvalidate() {
- return sysInvalidate;
+ return isFlag(SYS_INVALIDATE_FLAG_MASK);
+ }
+
+ /**
+ * @param sysInvalidate System invalidation flag.
+ */
+ private void systemInvalidate(boolean sysInvalidate) {
+ setFlag(sysInvalidate, SYS_INVALIDATE_FLAG_MASK);
}
/**
@@ -312,63 +298,45 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
* @return Check committed flag.
*/
public boolean checkCommitted() {
- return checkCommitted;
+ return isFlag(CHECK_COMMITTED_FLAG_MASK);
}
/**
* @param checkCommitted Check committed flag.
*/
public void checkCommitted(boolean checkCommitted) {
- this.checkCommitted = checkCommitted;
+ setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
}
/**
* @return {@code True}
*/
public boolean waitRemoteTransactions() {
- return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
+ return isFlag(WAIT_REMOTE_TX_FLAG_MASK);
}
/**
* @param waitRemoteTxs Wait remote transactions flag.
*/
- public void waitRemoteTransactions(boolean waitRemoteTxs) {
- if (waitRemoteTxs)
- flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK);
- else
- flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
+ private void waitRemoteTransactions(boolean waitRemoteTxs) {
+ setFlag(waitRemoteTxs, WAIT_REMOTE_TX_FLAG_MASK);
}
/**
* @return Flag indicating whether transaction needs return value.
*/
public boolean needReturnValue() {
- return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+ return isFlag(NEED_RETURN_VALUE_FLAG_MASK);
}
/**
* @param retVal Need return value.
*/
public void needReturnValue(boolean retVal) {
- if (retVal)
- flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
- else
- flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
+ setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -386,73 +354,37 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("checkCommitted", checkCommitted))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 22:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
case 23:
- if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 24:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
return false;
writer.incrementState();
case 25:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 26:
- if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 28:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 29:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -474,23 +406,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- checkCommitted = reader.readBoolean("checkCommitted");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
+ case 21:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -502,16 +418,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 21:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 22:
- nearNodeId = reader.readUuid("nearNodeId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
@@ -519,7 +427,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 23:
- partUpdateCnt = reader.readMessage("partUpdateCnt");
+ nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
return false;
@@ -527,7 +435,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 24:
- pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+ partUpdateCnt = reader.readMessage("partUpdateCnt");
if (!reader.isLastRead())
return false;
@@ -535,7 +443,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 25:
- subjId = reader.readUuid("subjId");
+ pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -543,30 +451,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 26:
- sysInvalidate = reader.readBoolean("sysInvalidate");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 27:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 28:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 29:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -586,6 +470,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 30;
+ return 27;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 8fb1f4e..7648178 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -40,7 +40,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
private static final long serialVersionUID = 0L;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Error. */
@GridDirectTransient
@@ -67,10 +67,10 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
* @param futId Future ID.
* @param miniId Mini future ID.
*/
- public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) {
+ public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, int miniId) {
super(xid, futId);
- assert miniId != null;
+ assert miniId != 0;
this.miniId = miniId;
}
@@ -78,7 +78,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -190,7 +190,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
writer.incrementState();
case 7:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -234,7 +234,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
case 7:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 79c371c..1b543bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -78,7 +78,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
private IgniteUuid nearFinFutId;
/** Near future ID. */
- private IgniteUuid nearFinMiniId;
+ private int nearFinMiniId;
/** Near XID. */
private GridCacheVersion nearXidVer;
@@ -255,16 +255,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/**
- * @return Near future mini ID.
- */
- public IgniteUuid nearFinishMiniId() {
- return nearFinMiniId;
- }
-
- /**
* @param nearFinMiniId Near future mini ID.
*/
- public void nearFinishMiniId(IgniteUuid nearFinMiniId) {
+ public void nearFinishMiniId(int nearFinMiniId) {
this.nearFinMiniId = nearFinMiniId;
}
@@ -641,8 +634,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
return;
}
- GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId,
- nearFinMiniId, err);
+ GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer,
+ threadId,
+ nearFinFutId,
+ nearFinMiniId,
+ err);
try {
cctx.io().send(nearNodeId, res, ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 735653d..56884ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -856,6 +856,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert F.isEmpty(tx.invalidPartitions());
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ -1,
tx.nearXidVersion(),
tx.colocated() ? tx.xid() : tx.nearFutureId(),
nearMiniId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 416540a..fba68ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -76,13 +76,19 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, boolean addDepInfo) {
- super(xid, addDepInfo);
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
+ int miniId,
+ boolean addDepInfo) {
+ super(part, xid, addDepInfo);
assert futId != null;
assert miniId != 0;
@@ -92,15 +98,21 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
* @param err Error.
* @param addDepInfo Deployment enabled.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, Throwable err,
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
+ int miniId,
+ Throwable err,
boolean addDepInfo) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
assert miniId != 0;
@@ -156,7 +168,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
*
* @return Collection of entry infos need to be preloaded.
*/
- public Collection<GridCacheEntryInfo> preloadEntries() {
+ Collection<GridCacheEntryInfo> preloadEntries() {
return preloadEntries == null ? Collections.<GridCacheEntryInfo>emptyList() : preloadEntries;
}
@@ -174,8 +186,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
preloadEntries.add(info);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -218,11 +229,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -237,31 +243,31 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 9:
+ case 11:
if (!writer.writeMap("invalidParts", invalidParts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
- case 10:
+ case 12:
if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 11:
+ case 13:
if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 12:
+ case 14:
if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
return false;
@@ -283,7 +289,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
return false;
switch (reader.state()) {
- case 8:
+ case 10:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -291,7 +297,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 9:
+ case 11:
invalidParts = reader.readMap("invalidParts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
@@ -299,7 +305,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 10:
+ case 12:
miniId = reader.readInt("miniId");
if (!reader.isLastRead())
@@ -307,7 +313,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 11:
+ case 13:
nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -315,7 +321,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 12:
+ case 14:
preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -335,6 +341,12 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 15;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxPrepareResponse.class, this,
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c13650e..3e9069c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -197,7 +197,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (fut.getClass() == FinishMiniFuture.class) {
FinishMiniFuture f = (FinishMiniFuture)fut;
- if (f.futureId().equals(res.miniId())) {
+ if (f.futureId() == res.miniId()) {
assert f.primary().id().equals(nodeId);
finishFut = f;
@@ -241,7 +241,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (fut.getClass() == CheckBackupMiniFuture.class) {
CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
- if (f.futureId().equals(res.miniId())) {
+ if (f.futureId() == res.miniId()) {
found = true;
assert f.node().id().equals(nodeId);
@@ -255,7 +255,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
- if (f.futureId().equals(res.miniId()))
+ if (f.futureId() == res.miniId())
f.onDhtFinishResponse(nodeId, false);
}
}
@@ -400,8 +400,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (mappings.single()) {
GridDistributedTxMapping mapping = mappings.singleMapping();
- if (mapping != null)
- finish(mapping, commit);
+ if (mapping != null) {
+ assert !hasFutures();
+
+ finish(1, mapping, commit);
+ }
}
else
finish(mappings.mappings(), commit);
@@ -468,6 +471,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
*
*/
private void checkBackup() {
+ assert !hasFutures();
+
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null) {
@@ -495,7 +500,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
"(backup has left grid): " + tx.xidVersion(), cause));
}
else {
- final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
+ final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(1, backup, mapping);
add(mini);
@@ -647,16 +652,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param commit Commit flag.
*/
private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit) {
+ assert !hasFutures();
+
+ int miniId = 0;
+
// Create mini futures.
for (GridDistributedTxMapping m : mappings)
- finish(m, commit);
+ finish(++miniId, m, commit);
}
/**
+ * @param miniId Mini future ID.
* @param m Mapping.
* @param commit Commit flag.
*/
- private void finish(GridDistributedTxMapping m, boolean commit) {
+ private void finish(int miniId, GridDistributedTxMapping m, boolean commit) {
ClusterNode n = m.primary();
assert !m.empty();
@@ -692,7 +702,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// If this is the primary node for the keys.
if (n.isLocal()) {
- req.miniId(IgniteUuid.randomUuid());
+ req.miniId(miniId);
IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finish(n.id(), tx, req);
@@ -701,7 +711,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(fut);
}
else {
- FinishMiniFuture fut = new FinishMiniFuture(m);
+ FinishMiniFuture fut = new FinishMiniFuture(miniId, m);
req.miniId(fut.futureId());
@@ -796,7 +806,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param waitRemoteTxs Wait for remote txs.
* @return Finish request.
*/
- private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId, boolean waitRemoteTxs) {
+ private GridDhtTxFinishRequest checkCommittedRequest(int miniId, boolean waitRemoteTxs) {
GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
cctx.localNodeId(),
futureId(),
@@ -811,8 +821,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.system(),
tx.ioPolicy(),
false,
- tx.syncMode() == FULL_SYNC,
- tx.syncMode() == FULL_SYNC,
+ tx.syncMode(),
null,
null,
null,
@@ -834,7 +843,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
*/
private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
+
+ /**
+ * @param futId Future ID.
+ */
+ MinFuture(int futId) {
+ this.futId = futId;
+ }
/**
* @param nodeId Node ID.
@@ -846,7 +862,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @return Future ID.
*/
- final IgniteUuid futureId() {
+ final int futureId() {
return futId;
}
}
@@ -863,9 +879,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private GridDistributedTxMapping m;
/**
+ * @param futId Future ID.
* @param m Mapping.
*/
- FinishMiniFuture(GridDistributedTxMapping m) {
+ FinishMiniFuture(int futId, GridDistributedTxMapping m) {
+ super(futId);
+
this.m = m;
}
@@ -898,9 +917,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
Collection<UUID> backups = txNodes.get(nodeId);
if (!F.isEmpty(backups)) {
- final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+ final CheckRemoteTxMiniFuture mini;
+
+ synchronized (sync) {
+ int futId = Integer.MIN_VALUE + futuresCountNoLock();
- add(mini);
+ mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
+
+ add(mini);
+ }
GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true);
@@ -972,10 +997,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private ClusterNode backup;
/**
+ * @param futId Future ID.
* @param backup Backup to check.
* @param m Mapping associated with the backup.
*/
- CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+ CheckBackupMiniFuture(int futId, ClusterNode backup, GridDistributedTxMapping m) {
+ super(futId);
+
this.backup = backup;
this.m = m;
}
@@ -1033,9 +1061,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private Set<UUID> nodes;
/**
+ * @param futId Future ID.
* @param nodes Backup nodes.
*/
- CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+ CheckRemoteTxMiniFuture(int futId, Set<UUID> nodes) {
+ super(futId);
+
this.nodes = nodes;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index dfbbe18..05c1f3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -40,25 +40,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
private static final long serialVersionUID = 0L;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Explicit lock flag. */
- private boolean explicitLock;
-
- /** Store enabled flag. */
- private boolean storeEnabled;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
+ private int miniId;
/**
* Empty constructor required for {@link Externalizable}.
@@ -109,83 +91,69 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
null,
threadId,
commit,
invalidate,
sys,
plc,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo
);
- this.syncMode = syncMode;
- this.explicitLock = explicitLock;
- this.storeEnabled = storeEnabled;
- this.topVer = topVer;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
+ explicitLock(explicitLock);
+ storeEnabled(storeEnabled);
}
/**
- * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ * @return Explicit lock flag.
*/
- @Nullable public CacheWriteSynchronizationMode syncMode() {
- return syncMode;
+ public boolean explicitLock() {
+ return isFlag(EXPLICIT_LOCK_FLAG_MASK);
}
/**
- * @return Explicit lock flag.
+ * @param explicitLock Explicit lock flag.
*/
- public boolean explicitLock() {
- return explicitLock;
+ private void explicitLock(boolean explicitLock) {
+ setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
}
/**
* @return Store enabled flag.
*/
public boolean storeEnabled() {
- return storeEnabled;
+ return isFlag(STORE_ENABLED_FLAG_MASK);
}
/**
- * @return Mini future ID.
+ * @param storeEnabled Store enabled flag.
*/
- public IgniteUuid miniId() {
- return miniId;
+ private void storeEnabled(boolean storeEnabled) {
+ setFlag(storeEnabled, STORE_ENABLED_FLAG_MASK);
}
/**
- * @param miniId Mini future ID.
+ * @return Mini future ID.
*/
- public void miniId(IgniteUuid miniId) {
- this.miniId = miniId;
+ public int miniId() {
+ return miniId;
}
/**
- * @return Subject ID.
+ * @param miniId Mini future ID.
*/
- @Nullable public UUID subjectId() {
- return subjId;
- }
+ public void miniId(int miniId) {
+ assert miniId > 0;
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
+ this.miniId = miniId;
}
/** {@inheritDoc} */
@@ -203,44 +171,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("explicitLock", explicitLock))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeBoolean("storeEnabled", storeEnabled))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -261,60 +193,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- explicitLock = reader.readBoolean("explicitLock");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- storeEnabled = reader.readBoolean("storeEnabled");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 21:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 23:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- topVer = reader.readMessage("topVer");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
@@ -333,7 +213,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 25;
+ return 22;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 37fbb36..e9043ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -46,7 +46,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
private byte[] errBytes;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Near tx thread ID. */
private long nearThreadId;
@@ -65,11 +65,11 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
* @param miniId Mini future Id.
* @param err Error.
*/
- public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
+ public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, int miniId,
@Nullable Throwable err) {
super(xid, futId);
- assert miniId != null;
+ assert miniId != 0;
this.nearThreadId = nearThreadId;
this.miniId = miniId;
@@ -84,7 +84,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -134,7 +134,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
writer.incrementState();
case 6:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -170,7 +170,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
case 6:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 9dad722..66fe902 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -102,6 +102,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
@@ -113,6 +114,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
* @param addDepInfo Deployment info flag.
*/
public GridNearTxPrepareResponse(
+ int part,
GridCacheVersion xid,
IgniteUuid futId,
int miniId,
@@ -123,7 +125,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
AffinityTopologyVersion clientRemapVer,
boolean addDepInfo
) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
assert dhtVer != null;
@@ -342,61 +344,61 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeMessage("clientRemapVer", clientRemapVer))
return false;
writer.incrementState();
- case 9:
+ case 11:
if (!writer.writeMessage("dhtVer", dhtVer))
return false;
writer.incrementState();
- case 10:
+ case 12:
if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 11:
+ case 13:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 12:
+ case 14:
if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 13:
+ case 15:
if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 14:
+ case 16:
if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 15:
+ case 17:
if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 16:
+ case 18:
if (!writer.writeMessage("retVal", retVal))
return false;
writer.incrementState();
- case 17:
+ case 19:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -418,7 +420,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
return false;
switch (reader.state()) {
- case 8:
+ case 10:
clientRemapVer = reader.readMessage("clientRemapVer");
if (!reader.isLastRead())
@@ -426,7 +428,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 9:
+ case 11:
dhtVer = reader.readMessage("dhtVer");
if (!reader.isLastRead())
@@ -434,7 +436,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 10:
+ case 12:
filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -442,7 +444,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 11:
+ case 13:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -450,7 +452,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 12:
+ case 14:
miniId = reader.readInt("miniId");
if (!reader.isLastRead())
@@ -458,7 +460,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 13:
+ case 15:
ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -466,7 +468,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 14:
+ case 16:
ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -474,7 +476,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 15:
+ case 17:
pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -482,7 +484,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 16:
+ case 18:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -490,7 +492,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 17:
+ case 19:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -510,12 +512,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 18;
+ return 20;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxPrepareResponse.class, this, "super", super.toString());
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/763f1b0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index f3f67a2..a3f1356 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -271,6 +271,7 @@ public class IgniteTxHandler {
U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
return new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -382,6 +383,7 @@ public class IgniteTxHandler {
}
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -836,14 +838,9 @@ public class IgniteTxHandler {
try {
assert tx != null : "Transaction is null for near finish request [nodeId=" +
nodeId + ", req=" + req + "]";
+ assert req.syncMode() != null : req;
- if (req.syncMode() == null) {
- boolean sync = req.commit() ? req.syncCommit() : req.syncRollback();
-
- tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC);
- }
- else
- tx.syncMode(req.syncMode());
+ tx.syncMode(req.syncMode());
if (req.commit()) {
tx.storeEnabled(req.storeEnabled());
@@ -937,7 +934,7 @@ public class IgniteTxHandler {
* @param nodeId Sender node ID.
* @param req Request.
*/
- protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
+ private void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
@@ -955,7 +952,12 @@ public class IgniteTxHandler {
GridDhtTxPrepareResponse res;
try {
- res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), req.deployInfo() != null);
+ res = new GridDhtTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.deployInfo() != null);
// Start near transaction first.
nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
@@ -1007,7 +1009,12 @@ public class IgniteTxHandler {
if (nearTx != null)
nearTx.rollback();
- res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), e,
+ res = new GridDhtTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ e,
req.deployInfo() != null);
}
@@ -1058,7 +1065,7 @@ public class IgniteTxHandler {
* @param nodeId Node ID.
* @param req Request.
*/
- protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+ private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
final GridDhtTxOnePhaseCommitAckRequest req) {
assert nodeId != null;
assert req != null;
@@ -1075,14 +1082,14 @@ public class IgniteTxHandler {
* @param req Request.
*/
@SuppressWarnings({"unchecked"})
- protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
+ private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
assert nodeId != null;
assert req != null;
if (req.checkCommitted()) {
boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
- if (!committed || !req.syncCommit())
+ if (!committed || req.syncMode() != FULL_SYNC)
sendReply(nodeId, req, committed, null);
else {
IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());