You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 15:00:29 UTC
[35/40] ignite git commit: Changed tx mini future ids from IgniteUuid
to int, removed some legacy code from tx processing.
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index a4132f2..4a443a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -77,7 +77,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
for (IgniteInternalFuture<?> fut : futures()) {
MiniFuture f = (MiniFuture)fut;
- if (f.node().id().equals(nodeId)) {
+ if (f.primary().id().equals(nodeId)) {
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
nodeId);
@@ -100,7 +100,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
MiniFuture f = miniFuture(res.miniId());
if (f != null) {
- assert f.node().id().equals(nodeId);
+ assert f.primary().id().equals(nodeId);
f.onResult(res);
}
@@ -130,16 +130,16 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
// Avoid iterator creation.
for (int i = 0; i < size; i++) {
- MiniFuture mini = (MiniFuture) future(i);
+ MiniFuture mini = (MiniFuture)future(i);
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -188,16 +188,22 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
AffinityTopologyVersion topVer = tx.topologyVersion();
- txMapping = new GridDhtTxMapping();
+ GridDhtTxMapping txMapping = new GridDhtTxMapping();
for (IgniteTxEntry txEntry : tx.allEntries()) {
txEntry.clearEntryReadVersion();
GridCacheContext cacheCtx = txEntry.context();
- List<ClusterNode> nodes = cacheCtx.isLocal() ?
- cacheCtx.affinity().nodesByKey(txEntry.key(), topVer) :
- cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
+ List<ClusterNode> nodes;
+
+ if (!cacheCtx.isLocal()) {
+ GridDhtPartitionTopology top = cacheCtx.topology();
+
+ nodes = top.nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
+ }
+ else
+ nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
ClusterNode primary = F.first(nodes);
@@ -224,15 +230,20 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase();
+ checkOnePhase(txMapping);
long timeout = tx.remainingTime();
- if (timeout == -1)
+ if (timeout == -1) {
onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx));
+ return;
+ }
+
+ int miniId = 0;
+
for (final GridDistributedTxMapping m : mappings.values()) {
- final ClusterNode node = m.node();
+ final ClusterNode primary = m.primary();
GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
futId,
@@ -258,14 +269,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
req.addDhtVersion(txEntry.txKey(), null);
}
- final MiniFuture fut = new MiniFuture(m);
+ final MiniFuture fut = new MiniFuture(m, ++miniId);
req.miniId(fut.futureId());
add(fut);
- if (node.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(node.id(),
+ if (primary.isLocal()) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
tx,
req);
@@ -282,11 +293,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
else {
try {
- cctx.io().send(node, req, tx.ioPolicy());
+ cctx.io().send(primary, req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near pessimistic prepare, sent request [txId=" + tx.nearXidVersion() +
- ", node=" + node.id() + ']');
+ ", node=" + primary.id() + ']');
}
}
catch (ClusterTopologyCheckedException e) {
@@ -297,7 +308,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near pessimistic prepare, failed send request [txId=" + tx.nearXidVersion() +
- ", node=" + node.id() + ", err=" + e + ']');
+ ", node=" + primary.id() + ", err=" + e + ']');
}
fut.onError(e);
@@ -338,8 +349,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
@Override public String toString() {
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
- return "[node=" + ((MiniFuture)f).node().id() +
- ", loc=" + ((MiniFuture)f).node().isLocal() +
+ return "[node=" + ((MiniFuture)f).primary().id() +
+ ", loc=" + ((MiniFuture)f).primary().isLocal() +
", done=" + f.isDone() + "]";
}
});
@@ -357,30 +368,32 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** */
private GridDistributedTxMapping m;
/**
* @param m Mapping.
+ * @param futId Mini future ID.
*/
- MiniFuture(GridDistributedTxMapping m) {
+ MiniFuture(GridDistributedTxMapping m, int futId) {
this.m = m;
+ this.futId = futId;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
/**
* @return Node ID.
*/
- public ClusterNode node() {
- return m.node();
+ public ClusterNode primary() {
+ return m.primary();
}
/**
@@ -402,7 +415,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
void onNodeLeft(ClusterTopologyCheckedException e) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near pessimistic prepare, mini future node left [txId=" + tx.nearXidVersion() +
- ", nodeId=" + m.node().id() + ']');
+ ", nodeId=" + m.primary().id() + ']');
}
if (tx.onePhaseCommit()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 1a925f3..994172b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -38,25 +37,24 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_ST
/**
*
*/
-@IgniteCodeGeneratingFail // partId read should not be generated by MessageCodeGenerator.
public class GridNearSingleGetRequest extends GridCacheMessage implements GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
/** */
- public static final int READ_THROUGH_FLAG_MASK = 0x01;
+ private static final int READ_THROUGH_FLAG_MASK = 0x01;
/** */
- public static final int SKIP_VALS_FLAG_MASK = 0x02;
+ private static final int SKIP_VALS_FLAG_MASK = 0x02;
/** */
- public static final int ADD_READER_FLAG_MASK = 0x04;
+ private static final int ADD_READER_FLAG_MASK = 0x04;
/** */
- public static final int NEED_VER_FLAG_MASK = 0x08;
+ private static final int NEED_VER_FLAG_MASK = 0x08;
/** */
- public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
+ private static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
/** Future ID. */
private long futId;
@@ -64,9 +62,6 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
/** */
private KeyCacheObject key;
- /** Partition ID. */
- private int partId = -1;
-
/** Flags. */
private byte flags;
@@ -128,7 +123,6 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
this.cacheId = cacheId;
this.futId = futId;
this.key = key;
- this.partId = key.partition();
this.topVer = topVer;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -202,7 +196,9 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
/** {@inheritDoc} */
@Override public int partition() {
- return partId;
+ assert key != null;
+
+ return key.partition();
}
/**
@@ -257,8 +253,6 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
assert key != null;
- key.partition(partId);
-
GridCacheContext cctx = ctx.cacheContext(cacheId);
key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
@@ -316,14 +310,6 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
reader.incrementState();
case 8:
- partId = reader.readInt("partId", -1);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -331,7 +317,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
reader.incrementState();
- case 10:
+ case 9:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -339,7 +325,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
reader.incrementState();
- case 11:
+ case 10:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -398,24 +384,18 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
writer.incrementState();
case 8:
- if (!writer.writeInt("partId", partId))
- return false;
-
- writer.incrementState();
-
- case 9:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 10:
+ case 9:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 11:
+ case 10:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -438,7 +418,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 12;
+ return 11;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 512f63e..7387501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -54,12 +55,11 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionRollbackException;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
@@ -69,18 +69,6 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx> {
/** */
- public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
-
- /** */
- public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.1");
-
- /** */
- public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = IgniteProductVersion.fromString("1.6.0");
-
- /** */
- public static final IgniteProductVersion ACK_DHT_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.6.8");
-
- /** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
@@ -157,7 +145,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
- for (IgniteInternalFuture<?> fut : futures())
+ for (IgniteInternalFuture<?> fut : futures()) {
if (isMini(fut)) {
MinFuture f = (MinFuture)fut;
@@ -168,6 +156,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
found = true;
}
}
+ }
return found;
}
@@ -209,8 +198,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (fut.getClass() == FinishMiniFuture.class) {
FinishMiniFuture f = (FinishMiniFuture)fut;
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
+ if (f.futureId() == res.miniId()) {
+ assert f.primary().id().equals(nodeId);
finishFut = f;
@@ -253,7 +242,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (fut.getClass() == CheckBackupMiniFuture.class) {
CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
- if (f.futureId().equals(res.miniId())) {
+ if (f.futureId() == res.miniId()) {
found = true;
assert f.node().id().equals(nodeId);
@@ -267,7 +256,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
- if (f.futureId().equals(res.miniId()))
+ if (f.futureId() == res.miniId())
f.onDhtFinishResponse(nodeId, false);
}
}
@@ -298,9 +287,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (isDone())
return false;
- if (err != null)
+ boolean nodeStop = false;
+
+ if (err != null) {
tx.setRollbackOnly();
+ nodeStop = err instanceof NodeStoppingException;
+ }
+
if (commit) {
if (tx.commitError() != null)
err = tx.commitError();
@@ -329,7 +323,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (tx.onePhaseCommit()) {
boolean commit = this.commit && err == null;
- finishOnePhase(commit);
+ if (!nodeStop)
+ finishOnePhase(commit);
try {
tx.tmFinish(commit);
@@ -412,8 +407,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (mappings.single()) {
GridDistributedTxMapping mapping = mappings.singleMapping();
- if (mapping != null)
- finish(mapping, commit);
+ if (mapping != null) {
+ assert !hasFutures() : futures();
+
+ finish(1, mapping, commit);
+ }
}
else
finish(mappings.mappings(), commit);
@@ -453,7 +451,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null) {
- UUID nodeId = mapping.node().id();
+ UUID nodeId = mapping.primary().id();
Collection<UUID> backups = tx.transactionNodes().get(nodeId);
@@ -470,10 +468,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
else if (backup.isLocal())
cctx.tm().removeTxReturn(tx.xidVersion());
- else {
- if (ACK_DHT_ONE_PHASE_SINCE.compareToIgnoreTimestamp(backup.version()) <= 0)
- cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion());
- }
+ else
+ cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion());
}
}
}
@@ -482,10 +478,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
*
*/
private void checkBackup() {
+ assert !hasFutures() : futures();
+
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null) {
- UUID nodeId = mapping.node().id();
+ UUID nodeId = mapping.primary().id();
Collection<UUID> backups = tx.transactionNodes().get(nodeId);
@@ -509,7 +507,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
"(backup has left grid): " + tx.xidVersion(), cause));
}
else {
- final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
+ final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(1, backup, mapping);
add(mini);
@@ -575,24 +573,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
else {
GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId(), false);
- // Preserve old behavior, otherwise response is not sent.
- if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
- finishReq.syncCommit(true);
-
try {
- if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0) {
- cctx.io().send(backup, finishReq, tx.ioPolicy());
+ cctx.io().send(backup, finishReq, tx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near finish fut, sent check committed request [" +
- "txId=" + tx.nearXidVersion() +
- ", node=" + backup.id() + ']');
- }
- }
- else {
- mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " +
- "the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() +
- ", ver=" + backup.version() + ']'));
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near finish fut, sent check committed request [" +
+ "txId=" + tx.nearXidVersion() +
+ ", node=" + backup.id() + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
@@ -624,18 +611,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (tx.mappings().empty())
return false;
- boolean finish = tx.txState().hasNearCache(cctx) || !commit;
-
- if (finish) {
- GridDistributedTxMapping mapping = tx.mappings().singleMapping();
-
- assert mapping != null : tx;
-
- if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(mapping.node().version()) > 0)
- finish = false;
- }
-
- return finish;
+ return tx.txState().hasNearCache(cctx) || !commit;
}
/**
@@ -683,17 +659,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param commit Commit flag.
*/
private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit) {
+ assert !hasFutures() : futures();
+
+ int miniId = 0;
+
// Create mini futures.
for (GridDistributedTxMapping m : mappings)
- finish(m, commit);
+ finish(++miniId, m, commit);
}
/**
+ * @param miniId Mini future ID.
* @param m Mapping.
* @param commit Commit flag.
*/
- private void finish(GridDistributedTxMapping m, boolean commit) {
- ClusterNode n = m.node();
+ private void finish(int miniId, GridDistributedTxMapping m, boolean commit) {
+ ClusterNode n = m.primary();
assert !m.empty();
@@ -728,7 +709,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// If this is the primary node for the keys.
if (n.isLocal()) {
- req.miniId(IgniteUuid.randomUuid());
+ req.miniId(miniId);
IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finish(n.id(), tx, req);
@@ -737,7 +718,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(fut);
}
else {
- FinishMiniFuture fut = new FinishMiniFuture(m);
+ FinishMiniFuture fut = new FinishMiniFuture(miniId, m);
req.miniId(fut.futureId());
@@ -755,12 +736,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
", node=" + n.id() + ']');
}
- boolean wait;
-
- if (syncMode == PRIMARY_SYNC)
- wait = n.version().compareToIgnoreTimestamp(PRIMARY_SYNC_TXS_SINCE) >= 0;
- else
- wait = syncMode == FULL_SYNC;
+ boolean wait = syncMode != FULL_ASYNC;
// If we don't wait for result, then mark future as done.
if (!wait)
@@ -768,7 +744,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
catch (ClusterTopologyCheckedException ignored) {
// Remove previous mapping.
- mappings.remove(m.node().id());
+ mappings.remove(m.primary().id());
fut.onNodeLeft(n.id(), false);
}
@@ -794,7 +770,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (f.getClass() == FinishMiniFuture.class) {
FinishMiniFuture fut = (FinishMiniFuture)f;
- ClusterNode node = fut.node();
+ ClusterNode node = fut.primary();
if (node != null) {
return "FinishFuture[node=" + node.id() +
@@ -837,7 +813,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param waitRemoteTxs Wait for remote txs.
* @return Finish request.
*/
- private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId, boolean waitRemoteTxs) {
+ private GridDhtTxFinishRequest checkCommittedRequest(int miniId, boolean waitRemoteTxs) {
GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
cctx.localNodeId(),
futureId(),
@@ -852,8 +828,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.system(),
tx.ioPolicy(),
false,
- tx.syncMode() == FULL_SYNC,
- tx.syncMode() == FULL_SYNC,
+ tx.syncMode(),
null,
null,
null,
@@ -875,7 +850,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
*/
private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
+
+ /**
+ * @param futId Future ID.
+ */
+ MinFuture(int futId) {
+ this.futId = futId;
+ }
/**
* @param nodeId Node ID.
@@ -887,14 +869,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @return Future ID.
*/
- final IgniteUuid futureId() {
+ final int futureId() {
return futId;
}
}
/**
- * Mini-future for get operations. Mini-futures are only waiting on a single
- * node as opposed to multiple nodes.
+ *
*/
private class FinishMiniFuture extends MinFuture {
/** */
@@ -905,17 +886,20 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private GridDistributedTxMapping m;
/**
+ * @param futId Future ID.
* @param m Mapping.
*/
- FinishMiniFuture(GridDistributedTxMapping m) {
+ FinishMiniFuture(int futId, GridDistributedTxMapping m) {
+ super(futId);
+
this.m = m;
}
/**
* @return Node ID.
*/
- ClusterNode node() {
- return m.node();
+ ClusterNode primary() {
+ return m.primary();
}
/**
@@ -927,10 +911,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/** {@inheritDoc} */
boolean onNodeLeft(UUID nodeId, boolean discoThread) {
- if (nodeId.equals(m.node().id())) {
+ if (nodeId.equals(m.primary().id())) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near finish fut, mini future node left [txId=" + tx.nearXidVersion() +
- ", node=" + m.node().id() + ']');
+ ", node=" + m.primary().id() + ']');
}
if (tx.syncMode() == FULL_SYNC) {
@@ -940,16 +924,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
Collection<UUID> backups = txNodes.get(nodeId);
if (!F.isEmpty(backups)) {
- final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+ final CheckRemoteTxMiniFuture mini;
+
+ synchronized (sync) {
+ int futId = Integer.MIN_VALUE + futuresCountNoLock();
+
+ mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
- add(mini);
+ add(mini);
+ }
GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true);
for (UUID backupId : backups) {
ClusterNode backup = cctx.discovery().node(backupId);
- if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) {
+ if (backup != null) {
if (backup.isLocal()) {
IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
@@ -1014,10 +1004,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private ClusterNode backup;
/**
+ * @param futId Future ID.
* @param backup Backup to check.
* @param m Mapping associated with the backup.
*/
- CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+ CheckBackupMiniFuture(int futId, ClusterNode backup, GridDistributedTxMapping m) {
+ super(futId);
+
this.backup = backup;
this.m = m;
}
@@ -1075,9 +1068,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private Set<UUID> nodes;
/**
+ * @param futId Future ID.
* @param nodes Backup nodes.
*/
- public CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+ CheckRemoteTxMiniFuture(int futId, Set<UUID> nodes) {
+ super(futId);
+
this.nodes = nodes;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index dfbbe18..05c1f3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -40,25 +40,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
private static final long serialVersionUID = 0L;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Explicit lock flag. */
- private boolean explicitLock;
-
- /** Store enabled flag. */
- private boolean storeEnabled;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
+ private int miniId;
/**
* Empty constructor required for {@link Externalizable}.
@@ -109,83 +91,69 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
null,
threadId,
commit,
invalidate,
sys,
plc,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo
);
- this.syncMode = syncMode;
- this.explicitLock = explicitLock;
- this.storeEnabled = storeEnabled;
- this.topVer = topVer;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
+ explicitLock(explicitLock);
+ storeEnabled(storeEnabled);
}
/**
- * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ * @return Explicit lock flag.
*/
- @Nullable public CacheWriteSynchronizationMode syncMode() {
- return syncMode;
+ public boolean explicitLock() {
+ return isFlag(EXPLICIT_LOCK_FLAG_MASK);
}
/**
- * @return Explicit lock flag.
+ * @param explicitLock Explicit lock flag.
*/
- public boolean explicitLock() {
- return explicitLock;
+ private void explicitLock(boolean explicitLock) {
+ setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
}
/**
* @return Store enabled flag.
*/
public boolean storeEnabled() {
- return storeEnabled;
+ return isFlag(STORE_ENABLED_FLAG_MASK);
}
/**
- * @return Mini future ID.
+ * @param storeEnabled Store enabled flag.
*/
- public IgniteUuid miniId() {
- return miniId;
+ private void storeEnabled(boolean storeEnabled) {
+ setFlag(storeEnabled, STORE_ENABLED_FLAG_MASK);
}
/**
- * @param miniId Mini future ID.
+ * @return Mini future ID.
*/
- public void miniId(IgniteUuid miniId) {
- this.miniId = miniId;
+ public int miniId() {
+ return miniId;
}
/**
- * @return Subject ID.
+ * @param miniId Mini future ID.
*/
- @Nullable public UUID subjectId() {
- return subjId;
- }
+ public void miniId(int miniId) {
+ assert miniId > 0;
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
+ this.miniId = miniId;
}
/** {@inheritDoc} */
@@ -203,44 +171,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("explicitLock", explicitLock))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeBoolean("storeEnabled", storeEnabled))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -261,60 +193,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- explicitLock = reader.readBoolean("explicitLock");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- storeEnabled = reader.readBoolean("storeEnabled");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 21:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 23:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- topVer = reader.readMessage("topVer");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
@@ -333,7 +213,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 25;
+ return 22;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 37fbb36..310e90d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -46,7 +46,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
private byte[] errBytes;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Near tx thread ID. */
private long nearThreadId;
@@ -59,17 +59,23 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param nearThreadId Near tx thread ID.
* @param futId Future ID.
* @param miniId Mini future Id.
* @param err Error.
*/
- public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
- @Nullable Throwable err) {
- super(xid, futId);
+ public GridNearTxFinishResponse(int part,
+ GridCacheVersion xid,
+ long nearThreadId,
+ IgniteUuid futId,
+ int miniId,
+ @Nullable Throwable err)
+ {
+ super(part, xid, futId);
- assert miniId != null;
+ assert miniId != 0;
this.nearThreadId = nearThreadId;
this.miniId = miniId;
@@ -84,7 +90,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -127,19 +133,19 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 5:
+ case 7:
if (!writer.writeByteArray("errBytes", errBytes))
return false;
writer.incrementState();
- case 6:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 8:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 7:
+ case 9:
if (!writer.writeLong("nearThreadId", nearThreadId))
return false;
@@ -161,7 +167,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 5:
+ case 7:
errBytes = reader.readByteArray("errBytes");
if (!reader.isLastRead())
@@ -169,15 +175,15 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 6:
- miniId = reader.readIgniteUuid("miniId");
+ case 8:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 7:
+ case 9:
nearThreadId = reader.readLong("nearThreadId");
if (!reader.isLastRead())
@@ -197,7 +203,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a97b0fe..8ed749c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -28,6 +28,7 @@ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -577,12 +578,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
void addEntryMapping(@Nullable Collection<GridDistributedTxMapping> maps) {
if (!F.isEmpty(maps)) {
for (GridDistributedTxMapping map : maps) {
- ClusterNode n = map.node();
+ ClusterNode primary = map.primary();
- GridDistributedTxMapping m = mappings.get(n.id());
+ GridDistributedTxMapping m = mappings.get(primary.id());
if (m == null) {
- mappings.put(m = new GridDistributedTxMapping(n));
+ mappings.put(m = new GridDistributedTxMapping(primary));
m.near(map.near());
@@ -605,7 +606,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @param entry Entry.
*/
void addSingleEntryMapping(GridDistributedTxMapping map, IgniteTxEntry entry) {
- ClusterNode n = map.node();
+ ClusterNode n = map.primary();
GridDistributedTxMapping m = new GridDistributedTxMapping(n);
@@ -883,7 +884,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
catch (IgniteCheckedException e) {
COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
- fut0.finish(false);
+ if (!(e instanceof NodeStoppingException))
+ fut0.finish(false);
}
}
});
@@ -1000,7 +1002,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx,
this,
timeout,
- IgniteUuid.randomUuid(),
+ 0,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
last,
needReturnValue() && implicit());
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 6b95309..a0f28c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -95,9 +95,6 @@ public abstract class GridNearTxPrepareFutureAdapter extends
/** Trackable flag. */
protected boolean trackable = true;
- /** Full information about transaction nodes mapping. */
- protected GridDhtTxMapping txMapping;
-
/**
* @param cctx Context.
* @param tx Transaction.
@@ -160,8 +157,10 @@ public abstract class GridNearTxPrepareFutureAdapter extends
/**
* Checks if mapped transaction can be committed on one phase.
* One-phase commit can be done if transaction maps to one primary node and not more than one backup.
+ *
+ * @param txMapping Transaction mapping.
*/
- protected final void checkOnePhase() {
+ protected final void checkOnePhase(GridDhtTxMapping txMapping) {
if (tx.storeUsed())
return;
@@ -184,14 +183,13 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param res Response.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
if (res == null)
return;
assert res.error() == null : res;
- assert F.isEmpty(res.invalidPartitions()) : res;
- UUID nodeId = m.node().id();
+ UUID nodeId = m.primary().id();
for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
IgniteTxEntry txEntry = tx.entry(entry.getKey());
@@ -207,8 +205,11 @@ public abstract class GridNearTxPrepareFutureAdapter extends
CacheVersionedValue tup = entry.getValue();
- nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
- tup.version(), nodeId, tx.topologyVersion());
+ nearEntry.resetFromPrimary(tup.value(),
+ tx.xidVersion(),
+ tup.version(),
+ nodeId,
+ tx.topologyVersion());
}
else if (txEntry.cached().detached()) {
GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index e55566b..ffeeb51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -23,17 +23,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -45,43 +43,36 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final int NEAR_FLAG_MASK = 0x01;
+
+ /** */
+ private static final int FIRST_CLIENT_REQ_FLAG_MASK = 0x02;
+
+ /** */
+ private static final int IMPLICIT_SINGLE_FLAG_MASK = 0x04;
+
+ /** */
+ private static final int EXPLICIT_LOCK_FLAG_MASK = 0x08;
+
/** Future ID. */
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Near mapping flag. */
- private boolean near;
+ private int miniId;
/** Topology version. */
private AffinityTopologyVersion topVer;
- /** {@code True} if this last prepare request for node. */
- private boolean last;
-
- /** IDs of backup nodes receiving last prepare request during this prepare. */
- @GridDirectCollection(UUID.class)
- @GridToStringInclude
- private Collection<UUID> lastBackups;
-
- /** Need return value flag. */
- private boolean retVal;
-
- /** Implicit single flag. */
- private boolean implicitSingle;
-
- /** Explicit lock flag. Set to true if at least one entry was explicitly locked. */
- private boolean explicitLock;
-
/** Subject ID. */
private UUID subjId;
/** Task name hash. */
private int taskNameHash;
- /** {@code True} if first optimistic tx prepare request sent from client node. */
- private boolean firstClientReq;
+ /** */
+ @GridToStringExclude
+ private byte flags;
/**
* Empty constructor required for {@link Externalizable}.
@@ -128,43 +119,42 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
boolean firstClientReq,
boolean addDepInfo
) {
- super(tx, timeout, reads, writes, txNodes, onePhaseCommit, addDepInfo);
+ super(tx,
+ timeout,
+ reads,
+ writes,
+ txNodes,
+ retVal,
+ last,
+ onePhaseCommit,
+ addDepInfo);
assert futId != null;
assert !firstClientReq || tx.optimistic() : tx;
this.futId = futId;
this.topVer = topVer;
- this.near = near;
- this.last = last;
- this.retVal = retVal;
- this.implicitSingle = implicitSingle;
- this.explicitLock = explicitLock;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
- this.firstClientReq = firstClientReq;
- }
- /**
- * @return {@code True} if first optimistic tx prepare request sent from client node.
- */
- public boolean firstClientRequest() {
- return firstClientReq;
+ setFlag(near, NEAR_FLAG_MASK);
+ setFlag(implicitSingle, IMPLICIT_SINGLE_FLAG_MASK);
+ setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
+ setFlag(firstClientReq, FIRST_CLIENT_REQ_FLAG_MASK);
}
-
/**
- * @return {@code True} if this last prepare request for node.
+ * @return {@code True} if first optimistic tx prepare request sent from client node.
*/
- public boolean last() {
- return last;
+ public final boolean firstClientRequest() {
+ return isFlag(FIRST_CLIENT_REQ_FLAG_MASK);
}
/**
* @return {@code True} if mapping is for near-enabled caches.
*/
- public boolean near() {
- return near;
+ public final boolean near() {
+ return isFlag(NEAR_FLAG_MASK);
}
/**
@@ -177,14 +167,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
* @param miniId Mini future ID.
*/
- public void miniId(IgniteUuid miniId) {
+ public void miniId(int miniId) {
this.miniId = miniId;
}
@@ -203,24 +193,17 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
/**
- * @return Whether return value is requested.
- */
- public boolean returnValue() {
- return retVal;
- }
-
- /**
* @return Implicit single flag.
*/
- public boolean implicitSingle() {
- return implicitSingle;
+ public final boolean implicitSingle() {
+ return isFlag(IMPLICIT_SINGLE_FLAG_MASK);
}
/**
* @return Explicit lock flag.
*/
- public boolean explicitLock() {
- return explicitLock;
+ public final boolean explicitLock() {
+ return isFlag(EXPLICIT_LOCK_FLAG_MASK);
}
/**
@@ -269,6 +252,26 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
return true;
}
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -284,73 +287,37 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 23:
- if (!writer.writeBoolean("explicitLock", explicitLock))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeBoolean("firstClientReq", firstClientReq))
+ case 20:
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
- case 25:
+ case 21:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 26:
- if (!writer.writeBoolean("implicitSingle", implicitSingle))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeBoolean("last", last))
- return false;
-
- writer.incrementState();
-
- case 28:
- if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- case 29:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 22:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 30:
- if (!writer.writeBoolean("near", near))
- return false;
-
- writer.incrementState();
-
- case 31:
- if (!writer.writeBoolean("retVal", retVal))
- return false;
-
- writer.incrementState();
-
- case 32:
+ case 23:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 33:
+ case 24:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 34:
+ case 25:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -372,23 +339,15 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 23:
- explicitLock = reader.readBoolean("explicitLock");
+ case 20:
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 24:
- firstClientReq = reader.readBoolean("firstClientReq");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
+ case 21:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -396,55 +355,15 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
- implicitSingle = reader.readBoolean("implicitSingle");
+ case 22:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 27:
- last = reader.readBoolean("last");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 28:
- lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 29:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 30:
- near = reader.readBoolean("near");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 31:
- retVal = reader.readBoolean("retVal");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 32:
+ case 23:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -452,7 +371,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 24:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -460,7 +379,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 34:
+ case 25:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -480,11 +399,24 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 35;
+ return 26;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearTxPrepareRequest.class, this, super.toString());
+ StringBuilder flags = new StringBuilder();
+
+ if (near())
+ flags.append("near");
+ if (firstClientRequest())
+ flags.append("clientReq");
+ if (implicitSingle())
+ flags.append("single");
+ if (explicitLock())
+ flags.append("explicitLock");
+
+ return S.toString(GridNearTxPrepareRequest.class, this,
+ "flags", flags.toString(),
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 8812709..66fe902 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -61,7 +61,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** DHT version. */
private GridCacheVersion dhtVer;
@@ -69,11 +69,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** Write version. */
private GridCacheVersion writeVer;
- /** */
- @GridToStringInclude
- @GridDirectCollection(int.class)
- private Collection<Integer> invalidParts;
-
/** Map of owned values to set on near node. */
@GridToStringInclude
@GridDirectTransient
@@ -107,6 +102,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
@@ -118,9 +114,10 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
* @param addDepInfo Deployment info flag.
*/
public GridNearTxPrepareResponse(
+ int part,
GridCacheVersion xid,
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
GridCacheVersion dhtVer,
GridCacheVersion writeVer,
GridCacheReturn retVal,
@@ -128,10 +125,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
AffinityTopologyVersion clientRemapVer,
boolean addDepInfo
) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
- assert miniId != null;
assert dhtVer != null;
this.futId = futId;
@@ -145,7 +141,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/**
* @return {@code True} if client node should remap transaction.
*/
- @Nullable public AffinityTopologyVersion clientRemapVersion() {
+ @Nullable AffinityTopologyVersion clientRemapVersion() {
return clientRemapVer;
}
@@ -170,7 +166,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -252,13 +248,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
return ownedVals != null && ownedVals.containsKey(key);
}
- /**
- * @return Invalid partitions.
- */
- public Collection<Integer> invalidPartitions() {
- return invalidParts;
- }
-
/** {@inheritDoc}
* @param ctx*/
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
@@ -355,67 +344,61 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeMessage("clientRemapVer", clientRemapVer))
return false;
writer.incrementState();
- case 9:
+ case 11:
if (!writer.writeMessage("dhtVer", dhtVer))
return false;
writer.incrementState();
- case 10:
+ case 12:
if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 11:
+ case 13:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 12:
- if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 14:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 14:
+ case 15:
if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 15:
+ case 16:
if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 16:
+ case 17:
if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 17:
+ case 18:
if (!writer.writeMessage("retVal", retVal))
return false;
writer.incrementState();
- case 18:
+ case 19:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -437,7 +420,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
return false;
switch (reader.state()) {
- case 8:
+ case 10:
clientRemapVer = reader.readMessage("clientRemapVer");
if (!reader.isLastRead())
@@ -445,7 +428,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 9:
+ case 11:
dhtVer = reader.readMessage("dhtVer");
if (!reader.isLastRead())
@@ -453,7 +436,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 10:
+ case 12:
filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -461,7 +444,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 11:
+ case 13:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -469,23 +452,15 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 12:
- invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- miniId = reader.readIgniteUuid("miniId");
+ case 14:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 14:
+ case 15:
ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -493,7 +468,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 15:
+ case 16:
ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -501,7 +476,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 16:
+ case 17:
pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -509,7 +484,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 17:
+ case 18:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -517,7 +492,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 18:
+ case 19:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -537,12 +512,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 19;
+ return 20;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxPrepareResponse.class, this, "super", super.toString());
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
index be78868..c32a844 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
@@ -62,14 +62,6 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest {
writer.onHeaderWritten();
}
- switch (writer.state()) {
- case 8:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
- }
-
return true;
}
@@ -83,16 +75,6 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest {
if (!super.readFrom(buf, reader))
return false;
- switch (reader.state()) {
- case 8:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
- }
-
return reader.afterMessageRead(GridNearUnlockRequest.class);
}
@@ -103,7 +85,7 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
index 7dec7af..9373bc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
@@ -50,7 +50,7 @@ public class IgniteTxMappingsImpl implements IgniteTxMappings {
/** {@inheritDoc} */
@Override public void put(GridDistributedTxMapping mapping) {
- mappings.put(mapping.node().id(), mapping);
+ mappings.put(mapping.primary().id(), mapping);
}
/** {@inheritDoc} */
@@ -61,7 +61,7 @@ public class IgniteTxMappingsImpl implements IgniteTxMappings {
/** {@inheritDoc} */
@Nullable @Override public GridDistributedTxMapping localMapping() {
for (GridDistributedTxMapping m : mappings.values()) {
- if (m.node().isLocal())
+ if (m.primary().isLocal())
return m;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
index fc15592..b37f8d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
@@ -44,7 +44,7 @@ public class IgniteTxMappingsSingleImpl implements IgniteTxMappings {
@Override public GridDistributedTxMapping get(UUID nodeId) {
GridDistributedTxMapping mapping0 = mapping;
- return (mapping0 != null && mapping0.node().id().equals(nodeId)) ? mapping0 : null;
+ return (mapping0 != null && mapping0.primary().id().equals(nodeId)) ? mapping0 : null;
}
/** {@inheritDoc} */
@@ -58,7 +58,7 @@ public class IgniteTxMappingsSingleImpl implements IgniteTxMappings {
@Override public GridDistributedTxMapping remove(UUID nodeId) {
GridDistributedTxMapping mapping0 = mapping;
- if (mapping0 != null && mapping0.node().id().equals(nodeId)) {
+ if (mapping0 != null && mapping0.primary().id().equals(nodeId)) {
this.mapping = null;
return mapping0;
@@ -71,7 +71,7 @@ public class IgniteTxMappingsSingleImpl implements IgniteTxMappings {
@Nullable @Override public GridDistributedTxMapping localMapping() {
GridDistributedTxMapping mapping0 = mapping;
- if (mapping0 != null && mapping0.node().isLocal())
+ if (mapping0 != null && mapping0.primary().isLocal())
return mapping0;
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 1691fd7..14a7ed0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -66,7 +66,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_ST
* {@link #equals(Object)} method, as transaction entries should use referential
* equality.
*/
-@IgniteCodeGeneratingFail // Field filters, partId should not be generated by MessageCodeGenerator.
+@IgniteCodeGeneratingFail // Field filters should not be generated by MessageCodeGenerator.
public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** */
private static final long serialVersionUID = 0L;
@@ -99,9 +99,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** Cache ID. */
private int cacheId;
- /** Partition ID. */
- private int partId = -1;
-
/** Transient tx key. */
@GridDirectTransient
private IgniteTxKey txKey;
@@ -261,7 +258,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
keepBinary(keepBinary);
key = entry.key();
- partId = entry.key().partition();
cacheId = entry.context().cacheId();
}
@@ -314,7 +310,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
addEntryProcessor(entryProcessor, invokeArgs);
key = entry.key();
- partId = entry.key().partition();
cacheId = entry.context().cacheId();
}
@@ -348,7 +343,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
IgniteTxEntry cp = new IgniteTxEntry();
cp.key = key;
- cp.partId = partId;
cp.cacheId = cacheId;
cp.ctx = ctx;
@@ -935,8 +929,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
key.finishUnmarshal(context().cacheObjectContext(), clsLdr);
- key.partition(partId);
-
val.unmarshal(this.ctx, clsLdr);
if (expiryPlcBytes != null && expiryPlc == null)
@@ -1067,40 +1059,35 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
writer.incrementState();
case 8:
- if (!writer.writeMessage("serReadVer", serReadVer))
+ if (!writer.writeMessage("oldVal", oldVal))
return false;
writer.incrementState();
case 9:
- if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
+ if (!writer.writeMessage("serReadVer", serReadVer))
return false;
writer.incrementState();
case 10:
- if (!writer.writeLong("ttl", ttl))
+ if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
return false;
writer.incrementState();
case 11:
- if (!writer.writeMessage("val", val))
+ if (!writer.writeLong("ttl", ttl))
return false;
writer.incrementState();
case 12:
- if (!writer.writeInt("partId", partId))
+ if (!writer.writeMessage("val", val))
return false;
writer.incrementState();
- case 13:
- if (!writer.writeMessage("oldVal", oldVal))
- return false;
-
- writer.incrementState();
}
return true;
@@ -1179,7 +1166,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 8:
- serReadVer = reader.readMessage("serReadVer");
+ oldVal = reader.readMessage("oldVal");
if (!reader.isLastRead())
return false;
@@ -1187,7 +1174,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 9:
- transformClosBytes = reader.readByteArray("transformClosBytes");
+ serReadVer = reader.readMessage("serReadVer");
if (!reader.isLastRead())
return false;
@@ -1195,7 +1182,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 10:
- ttl = reader.readLong("ttl");
+ transformClosBytes = reader.readByteArray("transformClosBytes");
if (!reader.isLastRead())
return false;
@@ -1203,7 +1190,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 11:
- val = reader.readMessage("val");
+ ttl = reader.readLong("ttl");
if (!reader.isLastRead())
return false;
@@ -1211,20 +1198,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 12:
- partId = reader.readInt("partId", -1);
+ val = reader.readMessage("val");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 13:
- oldVal = reader.readMessage("oldVal");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
}
return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1237,7 +1217,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 13;
}
/** {@inheritDoc} */