You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 16:11:59 UTC
[1/4] ignite git commit: Changed tx mini future ids from IgniteUuid
to int, removed some legacy code from tx processing.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0 cbc472fe7 -> 901be4f49
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 90a68ad..56a7fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -76,13 +76,11 @@ import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
-import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -114,7 +112,7 @@ public class IgniteTxHandler {
* @param req Request.
* @return Prepare future.
*/
- public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
+ private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
", node=" + nearNodeId + ']');
@@ -272,6 +270,7 @@ public class IgniteTxHandler {
U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
return new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -287,6 +286,27 @@ public class IgniteTxHandler {
}
/**
+ * @param entries Entries.
+ * @return First entry.
+ * @throws IgniteCheckedException If failed.
+ */
+ private IgniteTxEntry unmarshal(@Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
+ if (entries == null)
+ return null;
+
+ IgniteTxEntry firstEntry = null;
+
+ for (IgniteTxEntry e : entries) {
+ e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+
+ if (firstEntry == null)
+ firstEntry = e;
+ }
+
+ return firstEntry;
+ }
+
+ /**
* Prepares near transaction.
*
* @param nearNodeId Near node ID that initiated transaction.
@@ -309,15 +329,13 @@ public class IgniteTxHandler {
return null;
}
- IgniteTxEntry firstEntry = null;
+ IgniteTxEntry firstEntry;
try {
- for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
- e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+ IgniteTxEntry firstWrite = unmarshal(req.writes());
+ IgniteTxEntry firstRead = unmarshal(req.reads());
- if (firstEntry == null)
- firstEntry = e;
- }
+ firstEntry = firstWrite != null ? firstWrite : firstRead;
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
@@ -364,6 +382,7 @@ public class IgniteTxHandler {
}
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -449,17 +468,16 @@ public class IgniteTxHandler {
tx.transactionNodes(req.transactionNodes());
- // Set near on originating node flag only if the sender node has new version.
- if (req.near() && FINISH_NEAR_ONE_PHASE_SINCE.compareTo(nearNode.version()) <= 0)
+ if (req.near())
tx.nearOnOriginatingNode(true);
if (req.onePhaseCommit()) {
- assert req.last();
+ assert req.last() : req;
tx.onePhaseCommit(true);
}
- if (req.returnValue())
+ if (req.needReturnValue())
tx.needReturnValue(true);
IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
@@ -778,8 +796,13 @@ public class IgniteTxHandler {
", commit=" + req.commit() + ']');
// Always send finish response.
- GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(),
- req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+ GridCacheMessage res = new GridNearTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.threadId(),
+ req.futureId(),
+ req.miniId(),
+ new IgniteCheckedException("Transaction has been already completed."));
try {
ctx.io().send(nodeId, res, req.policy());
@@ -819,14 +842,9 @@ public class IgniteTxHandler {
try {
assert tx != null : "Transaction is null for near finish request [nodeId=" +
nodeId + ", req=" + req + "]";
+ assert req.syncMode() != null : req;
- if (req.syncMode() == null) {
- boolean sync = req.commit() ? req.syncCommit() : req.syncRollback();
-
- tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC);
- }
- else
- tx.syncMode(req.syncMode());
+ tx.syncMode(req.syncMode());
if (req.commit()) {
tx.storeEnabled(req.storeEnabled());
@@ -920,7 +938,7 @@ public class IgniteTxHandler {
* @param nodeId Sender node ID.
* @param req Request.
*/
- protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
+ private void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
@@ -938,7 +956,12 @@ public class IgniteTxHandler {
GridDhtTxPrepareResponse res;
try {
- res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), req.deployInfo() != null);
+ res = new GridDhtTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.deployInfo() != null);
// Start near transaction first.
nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
@@ -990,7 +1013,12 @@ public class IgniteTxHandler {
if (nearTx != null)
nearTx.rollback();
- res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), e,
+ res = new GridDhtTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ e,
req.deployInfo() != null);
}
@@ -1041,7 +1069,7 @@ public class IgniteTxHandler {
* @param nodeId Node ID.
* @param req Request.
*/
- protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+ private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
final GridDhtTxOnePhaseCommitAckRequest req) {
assert nodeId != null;
assert req != null;
@@ -1058,14 +1086,14 @@ public class IgniteTxHandler {
* @param req Request.
*/
@SuppressWarnings({"unchecked"})
- protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
+ private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
assert nodeId != null;
assert req != null;
if (req.checkCommitted()) {
boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
- if (!committed || !req.syncCommit())
+ if (!committed || req.syncMode() != FULL_SYNC)
sendReply(nodeId, req, committed, null);
else {
IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
@@ -1301,9 +1329,13 @@ public class IgniteTxHandler {
* @param committed {@code True} if transaction committed on this node.
* @param nearTxId Near tx version.
*/
- protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
+ protected final void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
if (req.replyRequired() || req.checkCommitted()) {
- GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+ GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId());
if (req.checkCommitted()) {
res.checkCommitted(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index bd806aa..b1a4003 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -37,6 +37,7 @@ import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -3277,7 +3278,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
}
catch (IgniteCheckedException | RuntimeException e) {
- rollbackAsync();
+ if (!(e instanceof NodeStoppingException))
+ rollbackAsync();
throw e;
}
[4/4] ignite git commit: Changed tx mini future ids from IgniteUuid
to int, removed some legacy code from tx processing.
Posted by sb...@apache.org.
Changed tx mini future ids from IgniteUuid to int, removed some legacy code from tx processing.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/901be4f4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/901be4f4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/901be4f4
Branch: refs/heads/ignite-2.0
Commit: 901be4f49440f7488781dd066bbef1cd2a85322f
Parents: cbc472f
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 13 19:11:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 13 19:11:49 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 2 +
.../processors/cache/KeyCacheObjectImpl.java | 9 +
.../distributed/GridDistributedBaseMessage.java | 2 +-
.../distributed/GridDistributedLockRequest.java | 26 +-
.../GridDistributedTxFinishRequest.java | 217 ++++++++++++----
.../GridDistributedTxFinishResponse.java | 75 +++++-
.../distributed/GridDistributedTxMapping.java | 45 +---
.../GridDistributedTxPrepareRequest.java | 205 ++++++++-------
.../GridDistributedTxPrepareResponse.java | 76 +++++-
.../GridDistributedUnlockRequest.java | 18 +-
.../distributed/dht/GridDhtLockRequest.java | 16 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 47 ++--
.../distributed/dht/GridDhtTxFinishRequest.java | 251 ++++++------------
.../dht/GridDhtTxFinishResponse.java | 70 +++---
.../cache/distributed/dht/GridDhtTxLocal.java | 34 ++-
.../distributed/dht/GridDhtTxPrepareFuture.java | 130 +++++-----
.../dht/GridDhtTxPrepareRequest.java | 138 ++++------
.../dht/GridDhtTxPrepareResponse.java | 117 ++++-----
.../distributed/dht/GridDhtUnlockRequest.java | 15 +-
.../colocated/GridDhtColocatedLockFuture.java | 18 +-
.../distributed/near/GridNearGetRequest.java | 55 ++--
.../distributed/near/GridNearLockFuture.java | 20 +-
.../distributed/near/GridNearLockRequest.java | 26 +-
.../distributed/near/GridNearLockResponse.java | 12 +-
...arOptimisticSerializableTxPrepareFuture.java | 72 +++---
.../near/GridNearOptimisticTxPrepareFuture.java | 41 +--
.../GridNearPessimisticTxPrepareFuture.java | 67 +++--
.../near/GridNearSingleGetRequest.java | 46 +---
.../near/GridNearTxFinishFuture.java | 172 +++++++------
.../near/GridNearTxFinishRequest.java | 174 ++-----------
.../near/GridNearTxFinishResponse.java | 36 +--
.../cache/distributed/near/GridNearTxLocal.java | 14 +-
.../near/GridNearTxPrepareFutureAdapter.java | 19 +-
.../near/GridNearTxPrepareRequest.java | 252 +++++++------------
.../near/GridNearTxPrepareResponse.java | 86 +++----
.../distributed/near/GridNearUnlockRequest.java | 20 +-
.../distributed/near/IgniteTxMappingsImpl.java | 4 +-
.../near/IgniteTxMappingsSingleImpl.java | 6 +-
.../cache/transactions/IgniteTxEntry.java | 44 +---
.../cache/transactions/IgniteTxHandler.java | 92 ++++---
.../transactions/IgniteTxLocalAdapter.java | 4 +-
41 files changed, 1252 insertions(+), 1521 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 1cd8fbe..99878ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -557,6 +557,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
@@ -697,6 +698,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
req.version(),
req.futureId(),
req.miniId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index 4f8570c..48797b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -200,4 +201,12 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
return val.equals(other.val);
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(S.INCLUDE_SENSITIVE ? getClass().getSimpleName() : "KeyCacheObject",
+ "part", part, true,
+ "val", val, true,
+ "hasValBytes", valBytes != null, false);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index ebbc9ae..630c79f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -142,7 +142,7 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
/**
* @return Count of keys referenced in candidates array (needed only locally for optimization).
*/
- public int keysCount() {
+ int keysCount() {
return cnt;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index a671296..48c01f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -79,10 +79,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
@GridDirectCollection(KeyCacheObject.class)
private List<KeyCacheObject> keys;
- /** Partition IDs of keys to lock. */
- @GridDirectCollection(int.class)
- protected List<Integer> partIds;
-
/** Array indicating whether value should be returned for a key. */
@GridToStringInclude
private boolean[] retVals;
@@ -226,13 +222,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
}
/**
- * @return Return flags.
- */
- public boolean[] returnFlags() {
- return retVals;
- }
-
- /**
* Sets skip store flag value.
*
* @param skipStore Skip store flag.
@@ -289,15 +278,11 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
boolean retVal,
GridCacheContext ctx
) throws IgniteCheckedException {
- if (keys == null) {
+ if (keys == null)
keys = new ArrayList<>(keysCount());
- partIds = new ArrayList<>(keysCount());
- }
keys.add(key);
- partIds.add(key.partition());
-
retVals[idx] = retVal;
idx++;
@@ -312,7 +297,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
/** {@inheritDoc} */
@Override public int partition() {
- return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ return keys != null && !keys.isEmpty() ? keys.get(0).partition() : -1;
}
/**
@@ -344,13 +329,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
GridCacheContext cctx = ctx.cacheContext(cacheId);
finishUnmarshalCacheObjects(keys, cctx, ldr);
-
- if (partIds != null && !partIds.isEmpty()) {
- assert partIds.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++)
- keys.get(i).partition(partIds.get(i));
- }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 3e47cc9..ab9f0ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,18 +20,24 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
/**
* Transaction completion message.
*/
@@ -39,6 +45,27 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ protected static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
+ /** */
+ protected static final int CHECK_COMMITTED_FLAG_MASK = 0x02;
+
+ /** */
+ protected static final int NEED_RETURN_VALUE_FLAG_MASK = 0x04;
+
+ /** */
+ protected static final int SYS_INVALIDATE_FLAG_MASK = 0x08;
+
+ /** */
+ protected static final int EXPLICIT_LOCK_FLAG_MASK = 0x10;
+
+ /** */
+ protected static final int STORE_ENABLED_FLAG_MASK = 0x20;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
/** Future ID. */
private IgniteUuid futId;
@@ -54,14 +81,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** Commit flag. */
private boolean commit;
- /** Sync commit flag. */
- @Deprecated
- private boolean syncCommit;
-
- /** Sync commit flag. */
- @Deprecated
- private boolean syncRollback;
-
/** Min version used as base for completed versions. */
private GridCacheVersion baseVer;
@@ -74,6 +93,18 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** IO policy. */
private byte plc;
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** */
+ private byte flags;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
/** Transient TX state. */
@GridDirectTransient
private IgniteTxState txState;
@@ -94,8 +125,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
* @param invalidate Invalidate flag.
* @param sys System transaction flag.
* @param plc IO policy.
- * @param syncCommit Sync commit flag.
- * @param syncRollback Sync rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -105,39 +135,93 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
public GridDistributedTxFinishRequest(
GridCacheVersion xidVer,
IgniteUuid futId,
+ @NotNull AffinityTopologyVersion topVer,
@Nullable GridCacheVersion commitVer,
long threadId,
boolean commit,
boolean invalidate,
boolean sys,
byte plc,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
+ @Nullable UUID subjId,
+ int taskNameHash,
int txSize,
boolean addDepInfo
) {
super(xidVer, 0, addDepInfo);
+
assert xidVer != null;
+ assert syncMode != null;
this.futId = futId;
+ this.topVer = topVer;
this.commitVer = commitVer;
this.threadId = threadId;
this.commit = commit;
this.invalidate = invalidate;
this.sys = sys;
this.plc = plc;
- this.syncCommit = syncCommit;
- this.syncRollback = syncRollback;
+ this.syncMode = syncMode;
this.baseVer = baseVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
this.txSize = txSize;
completedVersions(committedVers, rolledbackVers);
}
/**
+ * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ */
+ public final CacheWriteSynchronizationMode syncMode() {
+ return syncMode;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ @Nullable public final UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Task name hash.
+ */
+ public final int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public final AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
* @return System transaction flag.
*/
public boolean system() {
@@ -188,27 +272,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
}
/**
- * @return Sync commit flag.
- */
- public boolean syncCommit() {
- return syncCommit;
- }
-
- /**
- * @param syncCommit Sync commit flag.
- */
- public void syncCommit(boolean syncCommit) {
- this.syncCommit = syncCommit;
- }
-
- /**
- * @return Sync rollback flag.
- */
- public boolean syncRollback() {
- return syncRollback;
- }
-
- /**
* @return Base version.
*/
public GridCacheVersion baseVersion() {
@@ -227,7 +290,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
* @return {@code True} if reply is required.
*/
public boolean replyRequired() {
- return commit ? syncCommit : syncRollback;
+ assert syncMode != null;
+
+ return syncMode == FULL_SYNC;
}
/** {@inheritDoc} */
@@ -279,48 +344,66 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
writer.incrementState();
case 10:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("invalidate", invalidate))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 12:
- if (!writer.writeByte("plc", plc))
+ if (!writer.writeBoolean("invalidate", invalidate))
return false;
writer.incrementState();
case 13:
- if (!writer.writeBoolean("syncCommit", syncCommit))
+ if (!writer.writeByte("plc", plc))
return false;
writer.incrementState();
case 14:
- if (!writer.writeBoolean("syncRollback", syncRollback))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 15:
- if (!writer.writeBoolean("sys", sys))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 16:
- if (!writer.writeLong("threadId", threadId))
+ if (!writer.writeBoolean("sys", sys))
return false;
writer.incrementState();
case 17:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeLong("threadId", threadId))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
if (!writer.writeInt("txSize", txSize))
return false;
@@ -367,7 +450,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 10:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -375,7 +458,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 11:
- invalidate = reader.readBoolean("invalidate");
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -383,7 +466,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 12:
- plc = reader.readByte("plc");
+ invalidate = reader.readBoolean("invalidate");
if (!reader.isLastRead())
return false;
@@ -391,7 +474,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 13:
- syncCommit = reader.readBoolean("syncCommit");
+ plc = reader.readByte("plc");
if (!reader.isLastRead())
return false;
@@ -399,7 +482,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 14:
- syncRollback = reader.readBoolean("syncRollback");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -407,15 +490,19 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 15:
- sys = reader.readBoolean("sys");
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
if (!reader.isLastRead())
return false;
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
reader.incrementState();
case 16:
- threadId = reader.readLong("threadId");
+ sys = reader.readBoolean("sys");
if (!reader.isLastRead())
return false;
@@ -423,6 +510,30 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
reader.incrementState();
case 17:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ threadId = reader.readLong("threadId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
txSize = reader.readInt("txSize");
if (!reader.isLastRead())
@@ -442,7 +553,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 18;
+ return 21;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 109d665..2c446c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.processors.cache.distributed;
-import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -41,25 +42,59 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** Future ID. */
private IgniteUuid futId;
+ /** */
+ @GridToStringExclude
+ private byte flags;
+
+ /** */
+ private int part;
+
/**
- * Empty constructor required by {@link Externalizable}.
+ * Empty constructor required by {@link GridIoMessageFactory}.
*/
public GridDistributedTxFinishResponse() {
/* No-op. */
}
/**
+ * @param part Partition.
* @param txId Transaction id.
* @param futId Future ID.
*/
- public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) {
+ public GridDistributedTxFinishResponse(int part, GridCacheVersion txId, IgniteUuid futId) {
assert txId != null;
assert futId != null;
+ this.part = part;
this.txId = txId;
this.futId = futId;
}
+ /** {@inheritDoc} */
+ @Override public final int partition() {
+ return part;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
/**
*
* @return Transaction id.
@@ -101,12 +136,24 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
switch (writer.state()) {
case 3:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 4:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("txId", txId))
return false;
@@ -129,7 +176,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
switch (reader.state()) {
case 3:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -137,6 +184,22 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
reader.incrementState();
case 4:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
txId = reader.readMessage("txId");
if (!reader.isLastRead())
@@ -156,7 +219,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 8c9f181..f8cec50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -17,10 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -33,19 +29,15 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/**
* Transaction node mapping.
*/
-public class GridDistributedTxMapping implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
+public class GridDistributedTxMapping {
/** Mapped node. */
@GridToStringExclude
- private ClusterNode node;
+ private ClusterNode primary;
/** Entries. */
@GridToStringInclude
@@ -67,17 +59,10 @@ public class GridDistributedTxMapping implements Externalizable {
private boolean clientFirst;
/**
- * Empty constructor required for {@link Externalizable}.
+ * @param primary Primary node.
*/
- public GridDistributedTxMapping() {
- // No-op.
- }
-
- /**
- * @param node Mapped node.
- */
- public GridDistributedTxMapping(ClusterNode node) {
- this.node = node;
+ public GridDistributedTxMapping(ClusterNode primary) {
+ this.primary = primary;
entries = new LinkedHashSet<>();
}
@@ -127,8 +112,8 @@ public class GridDistributedTxMapping implements Externalizable {
/**
* @return Node.
*/
- public ClusterNode node() {
- return node;
+ public ClusterNode primary() {
+ return primary;
}
/**
@@ -235,21 +220,7 @@ public class GridDistributedTxMapping implements Externalizable {
}
/** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(node);
-
- U.writeCollection(out, entries);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- node = (ClusterNode)in.readObject();
-
- entries = U.readCollection(in);
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDistributedTxMapping.class, this, "node", node.id());
+ return S.toString(GridDistributedTxMapping.class, this, "node", primary.id());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 5e1499c..acf6bc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -40,11 +40,11 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAwa
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -60,18 +60,30 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** */
private static final long serialVersionUID = 0L;
- /** Version in which direct marshalling of tx nodes was introduced. */
- public static final IgniteProductVersion TX_NODES_DIRECT_MARSHALLABLE_SINCE = IgniteProductVersion.fromString("1.5.0");
+ /** */
+ private static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
+
+ /** */
+ private static final int INVALIDATE_FLAG_MASK = 0x02;
+
+ /** */
+ private static final int ONE_PHASE_COMMIT_FLAG_MASK = 0x04;
+
+ /** */
+ private static final int LAST_REQ_FLAG_MASK = 0x08;
+
+ /** */
+ private static final int SYSTEM_TX_FLAG_MASK = 0x10;
/** Collection to message converter. */
- public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
+ private static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
@Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {
return new UUIDCollectionMessage(uuids);
}
};
/** Message to collection converter. */
- public static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() {
+ private static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() {
@Override public Collection<UUID> apply(UUIDCollectionMessage msg) {
return msg.uuids();
}
@@ -97,10 +109,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
@GridToStringInclude
private long timeout;
- /** Invalidation flag. */
- @GridToStringInclude
- private boolean invalidate;
-
/** Transaction read set. */
@GridToStringInclude
@GridDirectCollection(IgniteTxEntry.class)
@@ -135,15 +143,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
@GridDirectMap(keyType = UUID.class, valueType = UUIDCollectionMessage.class)
private Map<UUID, UUIDCollectionMessage> txNodesMsg;
- /** */
- private byte[] txNodesBytes;
-
- /** One phase commit flag. */
- private boolean onePhaseCommit;
-
- /** System flag. */
- private boolean sys;
-
/** IO policy. */
private byte plc;
@@ -151,6 +150,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
@GridDirectTransient
private IgniteTxState txState;
+ /** */
+ @GridToStringExclude
+ private byte flags;
+
/**
* Required by {@link Externalizable}.
*/
@@ -164,6 +167,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
* @param reads Read entries.
* @param writes Write entries.
* @param txNodes Transaction nodes mapping.
+ * @param retVal Return value flag.
+ * @param last Last request flag.
* @param onePhaseCommit One phase commit flag.
* @param addDepInfo Deployment info flag.
*/
@@ -173,6 +178,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
@Nullable Collection<IgniteTxEntry> reads,
Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes,
+ boolean retVal,
+ boolean last,
boolean onePhaseCommit,
boolean addDepInfo
) {
@@ -182,16 +189,33 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
threadId = tx.threadId();
concurrency = tx.concurrency();
isolation = tx.isolation();
- invalidate = tx.isInvalidate();
txSize = tx.size();
- sys = tx.system();
plc = tx.ioPolicy();
this.timeout = timeout;
this.reads = reads;
this.writes = writes;
this.txNodes = txNodes;
- this.onePhaseCommit = onePhaseCommit;
+
+ setFlag(tx.system(), SYSTEM_TX_FLAG_MASK);
+ setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
+ setFlag(tx.isInvalidate(), INVALIDATE_FLAG_MASK);
+ setFlag(onePhaseCommit, ONE_PHASE_COMMIT_FLAG_MASK);
+ setFlag(last, LAST_REQ_FLAG_MASK);
+ }
+
+ /**
+ * @return Flag indicating whether transaction needs return value.
+ */
+ public final boolean needReturnValue() {
+ return isFlag(NEED_RETURN_VALUE_FLAG_MASK);
+ }
+
+ /**
+ * @param retVal Need return value.
+ */
+ public final void needReturnValue(boolean retVal) {
+ setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
}
/**
@@ -204,8 +228,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/**
* @return System flag.
*/
- public boolean system() {
- return sys;
+ public final boolean system() {
+ return isFlag(SYSTEM_TX_FLAG_MASK);
}
/**
@@ -253,7 +277,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
* @return Invalidate flag.
*/
public boolean isInvalidate() {
- return invalidate;
+ return isFlag(INVALIDATE_FLAG_MASK);
}
/**
@@ -316,7 +340,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
* @return One phase commit flag.
*/
public boolean onePhaseCommit() {
- return onePhaseCommit;
+ return isFlag(ONE_PHASE_COMMIT_FLAG_MASK);
+ }
+
+ /**
+ * @return {@code True} if this is last prepare request for node.
+ */
+ public boolean last() {
+ return isFlag(LAST_REQ_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -351,15 +382,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
dhtVerVals = dhtVers.values();
}
- // Marshal txNodes only if there is a node in topology with an older version.
- if (ctx.exchange().minimumNodeVersion(topologyVersion()).compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0) {
- if (txNodes != null && txNodesBytes == null)
- txNodesBytes = U.marshal(ctx, txNodes);
- }
- else {
- if (txNodesMsg == null)
- txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG);
- }
+ if (txNodesMsg == null)
+ txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG);
}
/** {@inheritDoc} */
@@ -392,9 +416,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
if (txNodesMsg != null)
txNodes = F.viewReadOnly(txNodesMsg, MSG_TO_COL);
-
- if (txNodesBytes != null && txNodes == null)
- txNodes = U.unmarshal(ctx, txNodesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
/** {@inheritDoc} */
@@ -407,6 +428,26 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
return ctx.txPrepareMessageLogger();
}
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -441,7 +482,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
writer.incrementState();
case 10:
- if (!writer.writeBoolean("invalidate", invalidate))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
@@ -453,66 +494,48 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
writer.incrementState();
case 12:
- if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
- return false;
-
- writer.incrementState();
-
- case 13:
if (!writer.writeByte("plc", plc))
return false;
writer.incrementState();
- case 14:
+ case 13:
if (!writer.writeCollection("reads", reads, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 15:
- if (!writer.writeBoolean("sys", sys))
- return false;
-
- writer.incrementState();
-
- case 16:
+ case 14:
if (!writer.writeLong("threadId", threadId))
return false;
writer.incrementState();
- case 17:
+ case 15:
if (!writer.writeLong("timeout", timeout))
return false;
writer.incrementState();
- case 18:
- if (!writer.writeByteArray("txNodesBytes", txNodesBytes))
- return false;
-
- writer.incrementState();
-
- case 19:
+ case 16:
if (!writer.writeMap("txNodesMsg", txNodesMsg, MessageCollectionItemType.UUID, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 20:
+ case 17:
if (!writer.writeInt("txSize", txSize))
return false;
writer.incrementState();
- case 21:
+ case 18:
if (!writer.writeMessage("writeVer", writeVer))
return false;
writer.incrementState();
- case 22:
+ case 19:
if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG))
return false;
@@ -563,7 +586,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 10:
- invalidate = reader.readBoolean("invalidate");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -583,14 +606,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 12:
- onePhaseCommit = reader.readBoolean("onePhaseCommit");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
plc = reader.readByte("plc");
if (!reader.isLastRead())
@@ -598,7 +613,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 14:
+ case 13:
reads = reader.readCollection("reads", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -606,15 +621,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 15:
- sys = reader.readBoolean("sys");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
+ case 14:
threadId = reader.readLong("threadId");
if (!reader.isLastRead())
@@ -622,7 +629,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 17:
+ case 15:
timeout = reader.readLong("timeout");
if (!reader.isLastRead())
@@ -630,15 +637,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 18:
- txNodesBytes = reader.readByteArray("txNodesBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
+ case 16:
txNodesMsg = reader.readMap("txNodesMsg", MessageCollectionItemType.UUID, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
@@ -646,7 +645,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 20:
+ case 17:
txSize = reader.readInt("txSize");
if (!reader.isLastRead())
@@ -654,7 +653,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 21:
+ case 18:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -662,7 +661,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
- case 22:
+ case 19:
writes = reader.readCollection("writes", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -682,12 +681,26 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 23;
+ return 20;
}
/** {@inheritDoc} */
@Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (needReturnValue())
+ flags.append("retVal");
+ if (isInvalidate())
+ flags.append("invalidate");
+ if (onePhaseCommit())
+ flags.append("onePhase");
+ if (last())
+ flags.append("last");
+ if (system())
+ flags.append("sys");
+
return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
+ "flags", flags.toString(),
"super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 76a5e31..53a1391 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -51,6 +51,12 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
@GridDirectTransient
private IgniteTxState txState;
+ /** */
+ private int part;
+
+ /** */
+ private byte flags;
+
/**
* Empty constructor (required by {@link Externalizable}).
*/
@@ -59,24 +65,54 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
}
/**
- * @param xid Transaction ID.
+ * @param part Partition.
+ * @param xid Lock or transaction ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDistributedTxPrepareResponse(GridCacheVersion xid, boolean addDepInfo) {
+ public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, boolean addDepInfo) {
super(xid, 0, addDepInfo);
+
+ this.part = part;
}
/**
- * @param xid Lock ID.
+ * @param part Partition.
+ * @param xid Lock or transaction ID.
* @param err Error.
* @param addDepInfo Deployment info flag.
*/
- public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable err, boolean addDepInfo) {
+ public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, Throwable err, boolean addDepInfo) {
super(xid, 0, addDepInfo);
+ this.part = part;
this.err = err;
}
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return part;
+ }
+
/** {@inheritDoc} */
@Override public Throwable error() {
return err;
@@ -106,8 +142,6 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
this.txState = txState;
}
- /** {@inheritDoc}
- * @param ctx*/
/** {@inheritDoc} */
@Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
return ctx.txPrepareMessageLogger();
@@ -150,6 +184,18 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
writer.incrementState();
+ case 8:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -174,6 +220,22 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
reader.incrementState();
+ case 8:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDistributedTxPrepareResponse.class);
@@ -186,7 +248,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
index 5d70ec1..be7ecf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@ -45,10 +45,6 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
@GridDirectCollection(KeyCacheObject.class)
private List<KeyCacheObject> keys;
- /** Partition IDs. */
- @GridDirectCollection(int.class)
- protected List<Integer> partIds;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -80,18 +76,15 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
* @throws IgniteCheckedException If failed.
*/
public void addKey(KeyCacheObject key, GridCacheContext ctx) throws IgniteCheckedException {
- if (keys == null) {
+ if (keys == null)
keys = new ArrayList<>(keysCount());
- partIds = new ArrayList<>(keysCount());
- }
keys.add(key);
- partIds.add(key.partition());
}
/** {@inheritDoc} */
@Override public int partition() {
- return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ return keys != null && !keys.isEmpty() ? keys.get(0).partition() : -1;
}
/** {@inheritDoc}
@@ -107,13 +100,6 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
super.finishUnmarshal(ctx, ldr);
finishUnmarshalCacheObjects(keys, ctx.cacheContext(cacheId), ldr);
-
- if (partIds != null && !partIds.isEmpty()) {
- assert partIds.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++)
- keys.get(i).partition(partIds.get(i));
- }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 95c6dfc..50167d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -419,12 +419,6 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
writer.incrementState();
- case 30:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -521,14 +515,6 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 30:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridDhtLockRequest.class);
@@ -541,7 +527,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 31;
+ return 30;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 60e07b3..17e9047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -179,7 +179,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
- if (f.futureId().equals(res.miniId())) {
+ if (f.futureId() == res.miniId()) {
found = true;
assert f.node().id().equals(nodeId);
@@ -304,10 +304,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
boolean res = false;
+ int miniId = 0;
+
for (ClusterNode n : nodes) {
assert !n.isLocal();
- MiniFuture fut = new MiniFuture(n);
+ MiniFuture fut = new MiniFuture(++miniId, n);
add(fut); // Append new future.
@@ -325,8 +327,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -391,9 +392,11 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
boolean res = false;
+ int miniId = 0;
+
// Create mini futures.
for (GridDistributedTxMapping dhtMapping : dhtMap.values()) {
- ClusterNode n = dhtMapping.node();
+ ClusterNode n = dhtMapping.primary();
assert !n.isLocal();
@@ -403,7 +406,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
// Nothing to send.
continue;
- MiniFuture fut = new MiniFuture(dhtMapping, nearMapping);
+ MiniFuture fut = new MiniFuture(++miniId, dhtMapping, nearMapping);
add(fut); // Append new future.
@@ -426,8 +429,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -474,12 +476,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
}
for (GridDistributedTxMapping nearMapping : nearMap.values()) {
- if (!dhtMap.containsKey(nearMapping.node().id())) {
+ if (!dhtMap.containsKey(nearMapping.primary().id())) {
if (nearMapping.empty())
// Nothing to send.
continue;
- MiniFuture fut = new MiniFuture(null, nearMapping);
+ MiniFuture fut = new MiniFuture(++miniId, null, nearMapping);
add(fut); // Append new future.
@@ -497,8 +499,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync,
- sync,
+ sync ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -513,12 +514,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
req.writeVersion(tx.writeVersion());
try {
- cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
+ cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, sent request near [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.node().id() + ']');
+ ", node=" + nearMapping.primary().id() + ']');
}
if (sync)
@@ -534,7 +535,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.node().id() +
+ ", node=" + nearMapping.primary().id() +
", err=" + e + ']');
}
@@ -573,7 +574,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** DHT mapping. */
@GridToStringInclude
@@ -588,19 +589,23 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
private ClusterNode node;
/**
+ * @param futId Future ID.
* @param node Node.
*/
- private MiniFuture(ClusterNode node) {
+ private MiniFuture(int futId, ClusterNode node) {
+ this.futId = futId;
this.node = node;
}
/**
+ * @param futId Future ID.
* @param dhtMapping Mapping.
* @param nearMapping nearMapping.
*/
- MiniFuture(GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) {
- assert dhtMapping == null || nearMapping == null || dhtMapping.node().equals(nearMapping.node());
+ MiniFuture(int futId, GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) {
+ assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
+ this.futId = futId;
this.dhtMapping = dhtMapping;
this.nearMapping = nearMapping;
}
@@ -608,7 +613,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
@@ -616,7 +621,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
* @return Node ID.
*/
public ClusterNode node() {
- return node != null ? node : dhtMapping != null ? dhtMapping.node() : nearMapping.node();
+ return node != null ? node : dhtMapping != null ? dhtMapping.primary() : nearMapping.primary();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index c618a18..d9b3ae7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
@@ -43,12 +44,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
private static final long serialVersionUID = 0L;
- /** */
- public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
-
- /** */
- public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
-
/** Near node ID. */
private UUID nearNodeId;
@@ -56,22 +51,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
private TransactionIsolation isolation;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** System invalidation flag. */
- private boolean sysInvalidate;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
+ private int miniId;
/** Pending versions with order less than one for this message (needed for commit ordering). */
@GridToStringInclude
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> pendingVers;
- /** Check committed flag. */
- private boolean checkCommitted;
-
/** Partition update counter. */
@GridToStringInclude
@GridDirectCollection(Long.class)
@@ -80,15 +66,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** One phase commit write version. */
private GridCacheVersion writeVer;
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** */
- private byte flags;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -110,8 +87,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param sys System flag.
* @param plc IO policy.
* @param sysInvalidate System invalidation flag.
- * @param syncCommit Synchronous commit flag.
- * @param syncRollback Synchronous rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -124,7 +100,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
public GridDhtTxFinishRequest(
UUID nearNodeId,
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
@NotNull AffinityTopologyVersion topVer,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
@@ -135,8 +111,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean sys,
byte plc,
boolean sysInvalidate,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
@@ -151,35 +126,34 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
commitVer,
threadId,
commit,
invalidate,
sys,
plc,
- syncCommit,
- syncRollback,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo);
- assert miniId != null;
+ assert miniId != 0;
assert nearNodeId != null;
assert isolation != null;
this.pendingVers = pendingVers;
- this.topVer = topVer;
this.nearNodeId = nearNodeId;
this.isolation = isolation;
this.miniId = miniId;
- this.sysInvalidate = sysInvalidate;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
needReturnValue(retVal);
waitRemoteTransactions(waitRemoteTxs);
+ systemInvalidate(sysInvalidate);
}
/**
@@ -196,8 +170,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param sys System flag.
* @param plc IO policy.
* @param sysInvalidate System invalidation flag.
- * @param syncCommit Synchronous commit flag.
- * @param syncRollback Synchronous rollback flag.
+ * @param syncMode Write synchronization mode.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -211,7 +184,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
public GridDhtTxFinishRequest(
UUID nearNodeId,
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
@NotNull AffinityTopologyVersion topVer,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
@@ -222,8 +195,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean sys,
byte plc,
boolean sysInvalidate,
- boolean syncCommit,
- boolean syncRollback,
+ CacheWriteSynchronizationMode syncMode,
GridCacheVersion baseVer,
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
@@ -236,9 +208,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
boolean retVal,
boolean waitRemoteTxs
) {
- this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
- sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
- subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
+ this(nearNodeId,
+ futId,
+ miniId,
+ topVer,
+ xidVer,
+ commitVer,
+ threadId,
+ isolation,
+ commit,
+ invalidate,
+ sys,
+ plc,
+ sysInvalidate,
+ syncMode,
+ baseVer,
+ committedVers,
+ rolledbackVers,
+ pendingVers,
+ txSize,
+ subjId,
+ taskNameHash,
+ addDepInfo,
+ retVal,
+ waitRemoteTxs);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -258,25 +251,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/**
* @return Mini ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
- * @return Subject ID.
- */
- @Nullable public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
* @return Transaction isolation.
*/
public TransactionIsolation isolation() {
@@ -294,7 +273,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @return System invalidate flag.
*/
public boolean isSystemInvalidate() {
- return sysInvalidate;
+ return isFlag(SYS_INVALIDATE_FLAG_MASK);
+ }
+
+ /**
+ * @param sysInvalidate System invalidation flag.
+ */
+ private void systemInvalidate(boolean sysInvalidate) {
+ setFlag(sysInvalidate, SYS_INVALIDATE_FLAG_MASK);
}
/**
@@ -312,63 +298,45 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
* @return Check committed flag.
*/
public boolean checkCommitted() {
- return checkCommitted;
+ return isFlag(CHECK_COMMITTED_FLAG_MASK);
}
/**
* @param checkCommitted Check committed flag.
*/
public void checkCommitted(boolean checkCommitted) {
- this.checkCommitted = checkCommitted;
+ setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
}
/**
* @return {@code True}
*/
public boolean waitRemoteTransactions() {
- return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
+ return isFlag(WAIT_REMOTE_TX_FLAG_MASK);
}
/**
* @param waitRemoteTxs Wait remote transactions flag.
*/
- public void waitRemoteTransactions(boolean waitRemoteTxs) {
- if (waitRemoteTxs)
- flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK);
- else
- flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
+ private void waitRemoteTransactions(boolean waitRemoteTxs) {
+ setFlag(waitRemoteTxs, WAIT_REMOTE_TX_FLAG_MASK);
}
/**
* @return Flag indicating whether transaction needs return value.
*/
public boolean needReturnValue() {
- return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+ return isFlag(NEED_RETURN_VALUE_FLAG_MASK);
}
/**
* @param retVal Need return value.
*/
public void needReturnValue(boolean retVal) {
- if (retVal)
- flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
- else
- flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
+ setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -386,73 +354,37 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("checkCommitted", checkCommitted))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 22:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
case 23:
- if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 24:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
return false;
writer.incrementState();
case 25:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 26:
- if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 28:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 29:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -474,23 +406,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- checkCommitted = reader.readBoolean("checkCommitted");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
+ case 21:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -502,16 +418,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 21:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 22:
- nearNodeId = reader.readUuid("nearNodeId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
@@ -519,7 +427,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 23:
- partUpdateCnt = reader.readMessage("partUpdateCnt");
+ nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
return false;
@@ -527,7 +435,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 24:
- pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+ partUpdateCnt = reader.readMessage("partUpdateCnt");
if (!reader.isLastRead())
return false;
@@ -535,7 +443,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 25:
- subjId = reader.readUuid("subjId");
+ pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -543,30 +451,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 26:
- sysInvalidate = reader.readBoolean("sysInvalidate");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 27:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 28:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 29:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -586,6 +470,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 30;
+ return 27;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 8fb1f4e..bc9503f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -39,8 +39,11 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** */
private static final long serialVersionUID = 0L;
+ /** Flag indicating if this is a check-committed response. */
+ private static final int CHECK_COMMITTED_FLAG_MASK = 0x01;
+
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Error. */
@GridDirectTransient
@@ -49,9 +52,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** Serialized error. */
private byte[] checkCommittedErrBytes;
- /** Flag indicating if this is a check-committed response. */
- private boolean checkCommitted;
-
/** Cache return value. */
private GridCacheReturn retVal;
@@ -63,14 +63,15 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
*/
- public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) {
- super(xid, futId);
+ public GridDhtTxFinishResponse(int part, GridCacheVersion xid, IgniteUuid futId, int miniId) {
+ super(part, xid, futId);
- assert miniId != null;
+ assert miniId != 0;
this.miniId = miniId;
}
@@ -78,7 +79,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -100,14 +101,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
* @return Check committed flag.
*/
public boolean checkCommitted() {
- return checkCommitted;
+ return isFlag(CHECK_COMMITTED_FLAG_MASK);
}
/**
* @param checkCommitted Check committed flag.
*/
public void checkCommitted(boolean checkCommitted) {
- this.checkCommitted = checkCommitted;
+ setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -158,11 +159,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxFinishResponse.class, this, super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -177,25 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 5:
- if (!writer.writeBoolean("checkCommitted", checkCommitted))
- return false;
-
- writer.incrementState();
-
- case 6:
+ case 7:
if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes))
return false;
writer.incrementState();
- case 7:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 8:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 8:
+ case 9:
if (!writer.writeMessage("retVal", retVal))
return false;
@@ -217,15 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 5:
- checkCommitted = reader.readBoolean("checkCommitted");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
+ case 7:
checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes");
if (!reader.isLastRead())
@@ -233,15 +215,15 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 7:
- miniId = reader.readIgniteUuid("miniId");
+ case 8:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 8:
+ case 9:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -261,6 +243,18 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (checkCommitted())
+ appendFlag(flags, "checkComm");
+
+ return S.toString(GridDhtTxFinishResponse.class, this,
+ "flags", flags.toString(),
+ "super", super.toString());
}
}
[3/4] ignite git commit: Changed tx mini future ids from IgniteUuid
to int, removed some legacy code from tx processing.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 7199ede..bff69bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -72,13 +72,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
private IgniteUuid nearFutId;
/** Near future ID. */
- private IgniteUuid nearMiniId;
+ private int nearMiniId;
/** Near future ID. */
private IgniteUuid nearFinFutId;
/** Near future ID. */
- private IgniteUuid nearFinMiniId;
+ private int nearFinMiniId;
/** Near XID. */
private GridCacheVersion nearXidVer;
@@ -121,7 +121,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
UUID nearNodeId,
GridCacheVersion nearXidVer,
IgniteUuid nearFutId,
- IgniteUuid nearMiniId,
+ int nearMiniId,
long nearThreadId,
boolean implicit,
boolean implicitSingle,
@@ -159,7 +159,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
assert nearNodeId != null;
assert nearFutId != null;
- assert nearMiniId != null;
+ assert nearMiniId != 0;
assert nearXidVer != null;
this.nearNodeId = nearNodeId;
@@ -255,16 +255,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/**
- * @return Near future mini ID.
- */
- public IgniteUuid nearFinishMiniId() {
- return nearFinMiniId;
- }
-
- /**
* @param nearFinMiniId Near future mini ID.
*/
- public void nearFinishMiniId(IgniteUuid nearFinMiniId) {
+ public void nearFinishMiniId(int nearFinMiniId) {
this.nearFinMiniId = nearFinMiniId;
}
@@ -394,7 +387,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
@Nullable Collection<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
long msgId,
- IgniteUuid nearMiniId,
+ int nearMiniId,
Map<UUID, Collection<UUID>> txNodes,
boolean last
) {
@@ -417,7 +410,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
needReturnValue()))) {
GridDhtTxPrepareFuture f = prepFut;
- assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
+ assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
"[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
if (timeout == -1)
@@ -427,7 +420,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
else {
- assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
+ assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
"[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
// Prepare was called explicitly.
@@ -626,7 +619,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
"Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit +
", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']';
- assert nearMiniId != null;
+ assert nearMiniId != 0;
return super.finish(commit);
}
@@ -641,8 +634,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
return;
}
- GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId,
- nearFinMiniId, err);
+ GridNearTxFinishResponse res = new GridNearTxFinishResponse(
+ -1,
+ nearXidVer,
+ threadId,
+ nearFinFutId,
+ nearFinMiniId,
+ err);
try {
cctx.io().send(nearNodeId, res, ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1227ba9..56884ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -177,7 +177,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private boolean trackable = true;
/** Near mini future id. */
- private IgniteUuid nearMiniId;
+ private int nearMiniId;
/** DHT versions map. */
private Map<IgniteTxKey, GridCacheVersion> dhtVerMap;
@@ -223,7 +223,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridCacheSharedContext cctx,
final GridDhtTxLocalAdapter tx,
long timeout,
- IgniteUuid nearMiniId,
+ int nearMiniId,
Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
boolean last,
boolean retVal
@@ -263,7 +263,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @return Near mini future id.
*/
- public IgniteUuid nearMiniId() {
+ int nearMiniId() {
return nearMiniId;
}
@@ -562,7 +562,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -856,9 +856,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert F.isEmpty(tx.invalidPartitions());
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ -1,
tx.nearXidVersion(),
tx.colocated() ? tx.xid() : tx.nearFutureId(),
- nearMiniId == null ? tx.xid() : nearMiniId,
+ nearMiniId,
tx.xidVersion(),
tx.writeVersion(),
ret,
@@ -1233,6 +1234,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return;
if (last) {
+ int miniId = 0;
+
assert tx.transactionNodes() != null;
final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
@@ -1241,7 +1244,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
assert !dhtMapping.empty();
- ClusterNode n = dhtMapping.node();
+ ClusterNode n = dhtMapping.primary();
assert !n.isLocal();
@@ -1257,7 +1260,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
+ MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
add(fut); // Append new future.
@@ -1367,11 +1370,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
- if (!tx.dhtMap().containsKey(nearMapping.node().id())) {
+ if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping);
+ MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
add(fut); // Append new future.
@@ -1417,12 +1420,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert req.transactionNodes() != null;
try {
- cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
+ cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.node().id() + ']');
+ ", node=" + nearMapping.primary().id() + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
@@ -1433,7 +1436,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.node().id() + ']');
+ ", node=" + nearMapping.primary().id() + ']');
}
fut.onResult(e);
@@ -1442,7 +1445,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.node().id() +
+ ", node=" + nearMapping.primary().id() +
", err=" + e + ']');
}
}
@@ -1479,27 +1482,37 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
while (true) {
try {
- Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
+ List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
+
+ assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
", entry=" + entry + ']');
- // Exclude local node.
- map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap);
+ for (int i = 1; i < dhtNodes.size(); i++) {
+ ClusterNode node = dhtNodes.get(i);
+
+ addMapping(entry, node, dhtMap);
+ }
Collection<UUID> readers = cached.readers();
if (!F.isEmpty(readers)) {
- Collection<ClusterNode> nearNodes =
- cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId())));
+ for (UUID readerId : readers) {
+ if (readerId.equals(tx.nearNodeId()))
+ continue;
- if (log.isDebugEnabled())
- log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) +
- ", entry=" + entry + ']');
+ ClusterNode readerNode = cctx.discovery().node(readerId);
+
+ if (readerNode == null || dhtNodes.contains(readerNode))
+ continue;
+
+ if (log.isDebugEnabled())
+ log.debug("Mapping entry to near node [node=" + readerNode + ", entry=" + entry + ']');
- // Exclude DHT nodes.
- map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap);
+ addMapping(entry, readerNode, nearMap);
+ }
}
else if (log.isDebugEnabled())
log.debug("Entry has no near readers: " + entry);
@@ -1516,39 +1529,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param entry Entry.
- * @param nodes Nodes.
+ * @param n Node.
* @param globalMap Map.
*/
- private void map(
+ private void addMapping(
IgniteTxEntry entry,
- Iterable<ClusterNode> nodes,
+ ClusterNode n,
Map<UUID, GridDistributedTxMapping> globalMap
) {
- if (nodes != null) {
- for (ClusterNode n : nodes) {
- GridDistributedTxMapping global = globalMap.get(n.id());
+ GridDistributedTxMapping global = globalMap.get(n.id());
- if (!F.isEmpty(entry.entryProcessors())) {
- GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
- entry.cached().partition());
+ if (!F.isEmpty(entry.entryProcessors())) {
+ GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+ entry.cached().partition());
- if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
- T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+ T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
- assert procVal != null : entry;
+ assert procVal != null : entry;
- entry.op(procVal.get1());
- entry.value(procVal.get2(), true, false);
- entry.entryProcessors(null);
- }
- }
-
- if (global == null)
- globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
-
- global.add(entry);
+ entry.op(procVal.get1());
+ entry.value(procVal.get2(), true, false);
+ entry.entryProcessors(null);
}
}
+
+ if (global == null)
+ globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
+
+ global.add(entry);
}
/**
@@ -1602,7 +1611,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Node ID. */
private UUID nodeId;
@@ -1617,17 +1626,20 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param nodeId Node ID.
+ * @param futId Future ID.
* @param dhtMapping Mapping.
* @param nearMapping nearMapping.
*/
MiniFuture(
UUID nodeId,
+ int futId,
GridDistributedTxMapping dhtMapping,
GridDistributedTxMapping nearMapping
) {
- assert dhtMapping == null || nearMapping == null || dhtMapping.node().equals(nearMapping.node());
+ assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
this.nodeId = nodeId;
+ this.futId = futId;
this.dhtMapping = dhtMapping;
this.nearMapping = nearMapping;
}
@@ -1635,7 +1647,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
@@ -1643,7 +1655,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return Node ID.
*/
public ClusterNode node() {
- return dhtMapping != null ? dhtMapping.node() : nearMapping.node();
+ return dhtMapping != null ? dhtMapping.primary() : nearMapping.primary();
}
/**
@@ -1689,7 +1701,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
try {
GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
- cached.removeReader(nearMapping.node().id(), res.messageId());
+ cached.removeReader(nearMapping.primary().id(), res.messageId());
break;
}
@@ -1709,22 +1721,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
// Process invalid partitions (no need to remap).
- // Keep this loop for backward compatibility.
- if (!F.isEmpty(res.invalidPartitions())) {
- for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
- IgniteTxEntry entry = it.next();
-
- if (res.invalidPartitions().contains(entry.cached().partition())) {
- it.remove();
-
- if (log.isDebugEnabled())
- log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() +
- ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
- }
- }
- }
-
- // Process invalid partitions (no need to remap).
if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index a8f2087..8898803 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -52,9 +52,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** */
private static final long serialVersionUID = 0L;
- /** */
- public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
-
/** Max order. */
private UUID nearNodeId;
@@ -62,7 +59,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -91,9 +88,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Near transaction ID. */
private GridCacheVersion nearXidVer;
- /** {@code True} if this is last prepare request for node. */
- private boolean last;
-
/** Subject ID. */
private UUID subjId;
@@ -103,9 +97,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Preload keys. */
private BitSet preloadKeys;
- /** */
- private byte flags;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -129,7 +120,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
*/
public GridDhtTxPrepareRequest(
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
AffinityTopologyVersion topVer,
GridDhtTxLocalAdapter tx,
long timeout,
@@ -143,17 +134,24 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
int taskNameHash,
boolean addDepInfo,
boolean retVal) {
- super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo);
+ super(tx,
+ timeout,
+ null,
+ dhtWrites,
+ txNodes,
+ retVal,
+ last,
+ onePhaseCommit,
+ addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.topVer = topVer;
this.futId = futId;
this.nearWrites = nearWrites;
this.miniId = miniId;
this.nearXidVer = nearXidVer;
- this.last = last;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -165,30 +163,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
/**
- * @return Flag indicating whether transaction needs return value.
- */
- public boolean needReturnValue() {
- return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
- }
-
- /**
- * @param retVal Need return value.
- */
- public void needReturnValue(boolean retVal) {
- if (retVal)
- flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
- else
- flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
- }
-
- /**
- * @return {@code True} if this is last prepare request for node.
- */
- public boolean last() {
- return last;
- }
-
- /**
* @return Near transaction ID.
*/
public GridCacheVersion nearXidVersion() {
@@ -227,7 +201,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param idx Entry index to set invalidation flag.
* @param invalidate Invalidation flag value.
*/
- public void invalidateNearEntry(int idx, boolean invalidate) {
+ void invalidateNearEntry(int idx, boolean invalidate) {
invalidateNearEntries.set(idx, invalidate);
}
@@ -244,7 +218,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
*
* @param idx Key index.
*/
- public void markKeyForPreload(int idx) {
+ void markKeyForPreload(int idx) {
if (preloadKeys == null)
preloadKeys = new BitSet();
@@ -271,7 +245,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -374,85 +348,73 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 23:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 24:
+ case 20:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 25:
+ case 21:
if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
return false;
writer.incrementState();
- case 26:
- if (!writer.writeBoolean("last", last))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 22:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 28:
+ case 23:
if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
- case 29:
+ case 24:
if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 30:
+ case 25:
if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
- case 31:
+ case 26:
if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 32:
+ case 27:
if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 33:
+ case 28:
if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
- case 34:
+ case 29:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 35:
+ case 30:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 36:
+ case 31:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -474,15 +436,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 23:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
+ case 20:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -490,7 +444,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 25:
+ case 21:
invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
if (!reader.isLastRead())
@@ -498,23 +452,15 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
- last = reader.readBoolean("last");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 27:
- miniId = reader.readIgniteUuid("miniId");
+ case 22:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 28:
+ case 23:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -522,7 +468,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 24:
nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -530,7 +476,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 25:
nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
@@ -538,7 +484,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 26:
ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -546,7 +492,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 27:
ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -554,7 +500,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 28:
preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
@@ -562,7 +508,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 34:
+ case 29:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -570,7 +516,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 35:
+ case 30:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -578,7 +524,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 36:
+ case 31:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -598,6 +544,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 37;
+ return 32;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 2eba9f1..fba68ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -58,16 +58,11 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Invalid partitions. */
- @GridToStringInclude
- @GridDirectCollection(int.class)
- private Collection<Integer> invalidParts;
+ private int miniId;
/** Invalid partitions by cache ID. */
@GridDirectMap(keyType = Integer.class, valueType = int[].class)
- private Map<Integer, int[]> invalidPartsByCacheId;
+ private Map<Integer, int[]> invalidParts;
/** Preload entries. */
@GridDirectCollection(GridCacheEntryInfo.class)
@@ -81,34 +76,46 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) {
- super(xid, addDepInfo);
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
+ int miniId,
+ boolean addDepInfo) {
+ super(part, xid, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
* @param err Error.
* @param addDepInfo Deployment enabled.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err,
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
+ int miniId,
+ Throwable err,
boolean addDepInfo) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
@@ -117,7 +124,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Evicted readers.
*/
- public Collection<IgniteTxKey> nearEvicted() {
+ Collection<IgniteTxKey> nearEvicted() {
return nearEvicted;
}
@@ -138,36 +145,22 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
- * @return Invalid partitions.
- */
- public Collection<Integer> invalidPartitions() {
- return invalidParts;
- }
-
- /**
- * @param invalidParts Invalid partitions.
- */
- public void invalidPartitions(Collection<Integer> invalidParts) {
- this.invalidParts = invalidParts;
- }
-
- /**
* @return Map from cacheId to an array of invalid partitions.
*/
- public Map<Integer, int[]> invalidPartitionsByCacheId() {
- return invalidPartsByCacheId;
+ Map<Integer, int[]> invalidPartitionsByCacheId() {
+ return invalidParts;
}
/**
* @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions.
*/
public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) {
- this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId);
+ this.invalidParts = CU.convertInvalidPartitions(invalidPartsByCacheId);
}
/**
@@ -175,7 +168,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
*
* @return Collection of entry infos need to be preloaded.
*/
- public Collection<GridCacheEntryInfo> preloadEntries() {
+ Collection<GridCacheEntryInfo> preloadEntries() {
return preloadEntries == null ? Collections.<GridCacheEntryInfo>emptyList() : preloadEntries;
}
@@ -193,8 +186,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
preloadEntries.add(info);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -237,11 +229,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -256,37 +243,31 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 9:
- if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMap("invalidPartsByCacheId", invalidPartsByCacheId, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+ case 11:
+ if (!writer.writeMap("invalidParts", invalidParts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
- case 11:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 12:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 12:
+ case 13:
if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 13:
+ case 14:
if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
return false;
@@ -308,7 +289,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
return false;
switch (reader.state()) {
- case 8:
+ case 10:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -316,31 +297,23 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 9:
- invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- invalidPartsByCacheId = reader.readMap("invalidPartsByCacheId", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+ case 11:
+ invalidParts = reader.readMap("invalidParts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 11:
- miniId = reader.readIgniteUuid("miniId");
+ case 12:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 12:
+ case 13:
nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -348,7 +321,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 13:
+ case 14:
preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -368,6 +341,12 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 15;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxPrepareResponse.class, this,
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
index 3737295..752df54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
@@ -119,11 +119,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
writer.incrementState();
- case 9:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
}
return true;
@@ -148,14 +143,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
reader.incrementState();
- case 9:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridDhtUnlockRequest.class);
@@ -168,6 +155,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 9;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 79ca108..0ce380d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -160,6 +160,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/** Keep binary. */
private final boolean keepBinary;
+ /** */
+ private int miniId;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@ -485,7 +488,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
* @return Mini future.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -499,7 +502,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -1049,7 +1052,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
if (node.isLocal())
lockLocally(mappedKeys, req.topologyVersion());
else {
- final MiniFuture fut = new MiniFuture(node, mappedKeys);
+ final MiniFuture fut = new MiniFuture(node, mappedKeys, ++miniId);
req.miniId(fut.futureId());
@@ -1393,7 +1396,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Node ID. */
@GridToStringExclude
@@ -1409,19 +1412,22 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/**
* @param node Node.
* @param keys Keys.
+ * @param futId Mini future ID.
*/
MiniFuture(
ClusterNode node,
- Collection<KeyCacheObject> keys
+ Collection<KeyCacheObject> keys,
+ int futId
) {
this.node = node;
this.keys = keys;
+ this.futId = futId;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 7ca2635..79c71b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -73,10 +73,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
@GridDirectCollection(KeyCacheObject.class)
private List<KeyCacheObject> keys;
- /** Partition IDs. */
- @GridDirectCollection(int.class)
- private List<Integer> partIds;
-
/** */
@GridDirectCollection(boolean.class)
private Collection<Boolean> flags;
@@ -154,12 +150,10 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
this.keys = new ArrayList<>(keys.size());
flags = new ArrayList<>(keys.size());
- partIds = new ArrayList<>(keys.size());
for (Map.Entry<KeyCacheObject, Boolean> entry : keys.entrySet()) {
this.keys.add(entry.getKey());
flags.add(entry.getValue());
- partIds.add(entry.getKey().partition());
}
this.readThrough = readThrough;
@@ -259,7 +253,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
/** {@inheritDoc} */
@Override public int partition() {
- return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ return keys != null && !keys.isEmpty() ? keys.get(0).partition() : -1;
}
/**
@@ -302,13 +296,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
while (keysIt.hasNext())
keyMap.put(keysIt.next(), flagsIt.next());
}
-
- if (partIds != null && !partIds.isEmpty()) {
- assert partIds.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++)
- keys.get(i).partition(partIds.get(i));
- }
}
/** {@inheritDoc} */
@@ -368,48 +355,42 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
writer.incrementState();
case 9:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 10:
if (!writer.writeBoolean("readThrough", readThrough))
return false;
writer.incrementState();
- case 11:
+ case 10:
if (!writer.writeBoolean("reload", reload))
return false;
writer.incrementState();
- case 12:
+ case 11:
if (!writer.writeBoolean("skipVals", skipVals))
return false;
writer.incrementState();
- case 13:
+ case 12:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 14:
+ case 13:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 15:
+ case 14:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 16:
+ case 15:
if (!writer.writeMessage("ver", ver))
return false;
@@ -480,14 +461,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
case 9:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
readThrough = reader.readBoolean("readThrough");
if (!reader.isLastRead())
@@ -495,7 +468,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 11:
+ case 10:
reload = reader.readBoolean("reload");
if (!reader.isLastRead())
@@ -503,7 +476,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 12:
+ case 11:
skipVals = reader.readBoolean("skipVals");
if (!reader.isLastRead())
@@ -511,7 +484,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 13:
+ case 12:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -519,7 +492,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 14:
+ case 13:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -527,7 +500,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 15:
+ case 14:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -535,7 +508,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 16:
+ case 15:
ver = reader.readMessage("ver");
if (!reader.isLastRead())
@@ -555,7 +528,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 17;
+ return 16;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index d3e3a15..ffc84d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -164,6 +164,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/** Keep binary context flag. */
private final boolean keepBinary;
+ /** */
+ private int miniId;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@ -532,7 +535,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* @return Mini future.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -546,7 +549,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -1178,7 +1181,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
req.filter(filter, cctx);
if (node.isLocal()) {
- req.miniId(IgniteUuid.randomUuid());
+ req.miniId(-1);
if (log.isDebugEnabled())
log.debug("Before locally locking near request: " + req);
@@ -1316,7 +1319,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
fut));
}
else {
- final MiniFuture fut = new MiniFuture(node, mappedKeys);
+ final MiniFuture fut = new MiniFuture(node, mappedKeys, ++miniId);
req.miniId(fut.futureId());
@@ -1489,7 +1492,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Node ID. */
@GridToStringExclude
@@ -1505,19 +1508,22 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/**
* @param node Node.
* @param keys Keys.
+ * @param futId Mini future ID.
*/
MiniFuture(
ClusterNode node,
- Collection<KeyCacheObject> keys
+ Collection<KeyCacheObject> keys,
+ int futId
) {
this.node = node;
this.keys = keys;
+ this.futId = futId;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 9e12153..229961e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -50,7 +50,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
private AffinityTopologyVersion topVer;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Filter. */
private CacheEntryPredicate[] filter;
@@ -256,14 +256,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
* @param miniId Mini future Id.
*/
- public void miniId(IgniteUuid miniId) {
+ public void miniId(int miniId) {
this.miniId = miniId;
}
@@ -423,7 +423,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
writer.incrementState();
case 28:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -464,12 +464,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
writer.incrementState();
- case 35:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -551,7 +545,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 28:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
@@ -606,14 +600,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 35:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridNearLockRequest.class);
@@ -626,7 +612,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 36;
+ return 35;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index e48a098..b10591d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -47,7 +47,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
private Collection<GridCacheVersion> pending;
/** */
- private IgniteUuid miniId;
+ private int miniId;
/** DHT versions. */
@GridToStringInclude
@@ -85,7 +85,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
int cacheId,
GridCacheVersion lockVer,
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
boolean filterRes,
int cnt,
Throwable err,
@@ -94,7 +94,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
) {
super(cacheId, lockVer, futId, cnt, err, addDepInfo);
- assert miniId != null;
+ assert miniId != 0;
this.miniId = miniId;
this.clientRemapVer = clientRemapVer;
@@ -134,7 +134,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -233,7 +233,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
writer.incrementState();
case 14:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -293,7 +293,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
reader.incrementState();
case 14:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index dbc8096..80508dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -56,9 +56,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
-import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -70,9 +68,6 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
*/
public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
/** */
- public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0");
-
- /** */
@GridToStringExclude
private KeyLockFuture keyLockFut;
@@ -80,6 +75,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
@GridToStringExclude
private ClientRemapFuture remapFut;
+ /** */
+ private int miniId;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -153,7 +151,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (isMini(fut)) {
MiniFuture f = (MiniFuture) fut;
- if (f.node().id().equals(nodeId)) {
+ if (f.primary().id().equals(nodeId)) {
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
nodeId);
@@ -186,7 +184,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (e instanceof IgniteTxOptimisticCheckedException || e instanceof IgniteTxTimeoutCheckedException) {
if (m != null)
- tx.removeMapping(m.node().id());
+ tx.removeMapping(m.primary().id());
}
ERR_UPD.compareAndSet(this, null, e);
@@ -227,7 +225,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -241,7 +239,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -339,15 +337,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
assert topVer.topologyVersion() > 0;
- txMapping = new GridDhtTxMapping();
+ GridDhtTxMapping txMapping = new GridDhtTxMapping();
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
for (IgniteTxEntry write : writes)
- map(write, topVer, mappings, remap, topLocked);
+ map(write, topVer, mappings, txMapping, remap, topLocked);
for (IgniteTxEntry read : reads)
- map(read, topVer, mappings, remap, topLocked);
+ map(read, topVer, mappings, txMapping, remap, topLocked);
if (keyLockFut != null)
keyLockFut.onAllKeysAdded();
@@ -365,12 +363,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase();
+ checkOnePhase(txMapping);
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
- add(new MiniFuture(this, m));
+ add(new MiniFuture(this, m, ++miniId));
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -385,7 +383,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture fut = (MiniFuture)fut0;
- IgniteCheckedException err = prepare(fut);
+ IgniteCheckedException err = prepare(fut, txMapping);
if (err != null) {
while (it.hasNext()) {
@@ -396,7 +394,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
fut = (MiniFuture)fut0;
- tx.removeMapping(fut.mapping().node().id());
+ tx.removeMapping(fut.mapping().primary().id());
fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err));
}
@@ -421,10 +419,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param fut Mini future.
* @return Prepare error if any.
*/
- @Nullable private IgniteCheckedException prepare(final MiniFuture fut) {
+ @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) {
GridDistributedTxMapping m = fut.mapping();
- final ClusterNode n = m.node();
+ final ClusterNode primary = m.primary();
long timeout = tx.remainingTime();
@@ -477,8 +475,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
req.miniId(fut.futureId());
// If this is the primary node for the keys.
- if (n.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+ if (primary.isLocal()) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
+ tx,
+ req);
prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
@Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
@@ -493,7 +493,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
else {
try {
- cctx.io().send(n, req, tx.ioPolicy());
+ cctx.io().send(primary, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException e) {
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
@@ -523,6 +523,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+ GridDhtTxMapping txMapping,
boolean remap,
boolean topLocked
) {
@@ -544,13 +545,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
}
- if (primary.version().compareTo(SER_TX_SINCE) < 0) {
- onDone(new IgniteCheckedException("Optimistic serializable transactions can be used only with node " +
- "version starting from " + SER_TX_SINCE));
-
- return;
- }
-
// Must re-initialize cached entry while holding topology lock.
if (cacheCtx.isNear())
entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
@@ -626,8 +620,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
Collection<String> futs = F.viewReadOnly(futures(),
new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
- return "[node=" + ((MiniFuture)f).node().id() +
- ", loc=" + ((MiniFuture)f).node().isLocal() +
+ return "[node=" + ((MiniFuture)f).primary().id() +
+ ", loc=" + ((MiniFuture)f).primary().isLocal() +
", done=" + f.isDone() + "]";
}
},
@@ -654,7 +648,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
* Constructor.
*/
- public ClientRemapFuture() {
+ ClientRemapFuture() {
super(new ClientRemapFutureReducer());
}
}
@@ -697,7 +691,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Parent future. */
private final GridNearOptimisticSerializableTxPrepareFuture parent;
@@ -713,24 +707,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
* @param parent Parent future.
* @param m Mapping.
+ * @param futId Mini future ID.
*/
- MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m) {
+ MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m, int futId) {
this.parent = parent;
this.m = m;
+ this.futId = futId;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
/**
- * @return Node ID.
+ * @return Primary node.
*/
- public ClusterNode node() {
- return m.node();
+ public ClusterNode primary() {
+ return m.primary();
}
/**
@@ -795,7 +791,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
assert parent.cctx.kernalContext().clientNode();
assert m.clientFirst();
- parent.tx.removeMapping(m.node().id());
+ parent.tx.removeMapping(m.primary().id());
ClientRemapFuture remapFut0 = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index a2cb182..6189b38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
@@ -75,6 +74,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
@GridToStringExclude
private KeyLockFuture keyLockFut;
+ /** */
+ private int miniId;
+
+ /** */
+ private GridDhtTxMapping txMapping;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -232,7 +237,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -246,7 +251,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -352,7 +357,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
GridDistributedTxMapping mapping = map(write, topVer, null, topLocked, remap);
- if (mapping.node().isLocal()) {
+ if (mapping.primary().isLocal()) {
if (write.context().isNear())
tx.nearLocallyMapped(true);
else if (write.context().isColocated())
@@ -377,7 +382,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase();
+ checkOnePhase(txMapping);
proceedPrepare(mapping, null);
}
@@ -414,12 +419,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
updated.last(true);
- GridDistributedTxMapping prev = map.put(updated.node().id(), updated);
+ GridDistributedTxMapping prev = map.put(updated.primary().id(), updated);
if (prev != null)
prev.last(false);
- if (updated.node().isLocal()) {
+ if (updated.primary().isLocal()) {
if (write.context().isNear())
tx.nearLocallyMapped(true);
else if (write.context().isColocated())
@@ -446,7 +451,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase();
+ checkOnePhase(txMapping);
proceedPrepare(mappings);
}
@@ -480,7 +485,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
try {
assert !m.empty();
- final ClusterNode n = m.node();
+ final ClusterNode n = m.primary();
long timeout = tx.remainingTime();
@@ -521,7 +526,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- final MiniFuture fut = new MiniFuture(this, m, mappings);
+ final MiniFuture fut = new MiniFuture(this, m, ++miniId, mappings);
req.miniId(fut.futureId());
@@ -639,7 +644,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+ if (cur == null || !cur.primary().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();
cur = new GridDistributedTxMapping(primary);
@@ -771,7 +776,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
private final GridNearOptimisticTxPrepareFuture parent;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Keys. */
@GridToStringInclude
@@ -787,19 +792,23 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
* @param parent Parent.
* @param m Mapping.
+ * @param futId Mini future ID.
* @param mappings Queue of mappings to proceed with.
*/
- MiniFuture(GridNearOptimisticTxPrepareFuture parent, GridDistributedTxMapping m,
+ MiniFuture(GridNearOptimisticTxPrepareFuture parent,
+ GridDistributedTxMapping m,
+ int futId,
Queue<GridDistributedTxMapping> mappings) {
this.parent = parent;
this.m = m;
+ this.futId = futId;
this.mappings = mappings;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
@@ -807,7 +816,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @return Node ID.
*/
public ClusterNode node() {
- return m.node();
+ return m.primary();
}
/**
@@ -840,7 +849,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near optimistic prepare fut, mini future node left [txId=" + parent.tx.nearXidVersion() +
- ", node=" + m.node().id() + ']');
+ ", node=" + m.primary().id() + ']');
}
if (isDone())
[2/4] ignite git commit: Changed tx mini future ids from IgniteUuid
to int, removed some legacy code from tx processing.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index a4132f2..4a443a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -77,7 +77,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
for (IgniteInternalFuture<?> fut : futures()) {
MiniFuture f = (MiniFuture)fut;
- if (f.node().id().equals(nodeId)) {
+ if (f.primary().id().equals(nodeId)) {
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
nodeId);
@@ -100,7 +100,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
MiniFuture f = miniFuture(res.miniId());
if (f != null) {
- assert f.node().id().equals(nodeId);
+ assert f.primary().id().equals(nodeId);
f.onResult(res);
}
@@ -130,16 +130,16 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
// Avoid iterator creation.
for (int i = 0; i < size; i++) {
- MiniFuture mini = (MiniFuture) future(i);
+ MiniFuture mini = (MiniFuture)future(i);
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -188,16 +188,22 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
AffinityTopologyVersion topVer = tx.topologyVersion();
- txMapping = new GridDhtTxMapping();
+ GridDhtTxMapping txMapping = new GridDhtTxMapping();
for (IgniteTxEntry txEntry : tx.allEntries()) {
txEntry.clearEntryReadVersion();
GridCacheContext cacheCtx = txEntry.context();
- List<ClusterNode> nodes = cacheCtx.isLocal() ?
- cacheCtx.affinity().nodesByKey(txEntry.key(), topVer) :
- cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
+ List<ClusterNode> nodes;
+
+ if (!cacheCtx.isLocal()) {
+ GridDhtPartitionTopology top = cacheCtx.topology();
+
+ nodes = top.nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
+ }
+ else
+ nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
ClusterNode primary = F.first(nodes);
@@ -224,15 +230,20 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase();
+ checkOnePhase(txMapping);
long timeout = tx.remainingTime();
- if (timeout == -1)
+ if (timeout == -1) {
onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx));
+ return;
+ }
+
+ int miniId = 0;
+
for (final GridDistributedTxMapping m : mappings.values()) {
- final ClusterNode node = m.node();
+ final ClusterNode primary = m.primary();
GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
futId,
@@ -258,14 +269,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
req.addDhtVersion(txEntry.txKey(), null);
}
- final MiniFuture fut = new MiniFuture(m);
+ final MiniFuture fut = new MiniFuture(m, ++miniId);
req.miniId(fut.futureId());
add(fut);
- if (node.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(node.id(),
+ if (primary.isLocal()) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
tx,
req);
@@ -282,11 +293,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
else {
try {
- cctx.io().send(node, req, tx.ioPolicy());
+ cctx.io().send(primary, req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near pessimistic prepare, sent request [txId=" + tx.nearXidVersion() +
- ", node=" + node.id() + ']');
+ ", node=" + primary.id() + ']');
}
}
catch (ClusterTopologyCheckedException e) {
@@ -297,7 +308,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near pessimistic prepare, failed send request [txId=" + tx.nearXidVersion() +
- ", node=" + node.id() + ", err=" + e + ']');
+ ", node=" + primary.id() + ", err=" + e + ']');
}
fut.onError(e);
@@ -338,8 +349,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
@Override public String toString() {
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
- return "[node=" + ((MiniFuture)f).node().id() +
- ", loc=" + ((MiniFuture)f).node().isLocal() +
+ return "[node=" + ((MiniFuture)f).primary().id() +
+ ", loc=" + ((MiniFuture)f).primary().isLocal() +
", done=" + f.isDone() + "]";
}
});
@@ -357,30 +368,32 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** */
private GridDistributedTxMapping m;
/**
* @param m Mapping.
+ * @param futId Mini future ID.
*/
- MiniFuture(GridDistributedTxMapping m) {
+ MiniFuture(GridDistributedTxMapping m, int futId) {
this.m = m;
+ this.futId = futId;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
/**
* @return Node ID.
*/
- public ClusterNode node() {
- return m.node();
+ public ClusterNode primary() {
+ return m.primary();
}
/**
@@ -402,7 +415,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
void onNodeLeft(ClusterTopologyCheckedException e) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near pessimistic prepare, mini future node left [txId=" + tx.nearXidVersion() +
- ", nodeId=" + m.node().id() + ']');
+ ", nodeId=" + m.primary().id() + ']');
}
if (tx.onePhaseCommit()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 1a925f3..994172b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -38,25 +37,24 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_ST
/**
*
*/
-@IgniteCodeGeneratingFail // partId read should not be generated by MessageCodeGenerator.
public class GridNearSingleGetRequest extends GridCacheMessage implements GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
/** */
- public static final int READ_THROUGH_FLAG_MASK = 0x01;
+ private static final int READ_THROUGH_FLAG_MASK = 0x01;
/** */
- public static final int SKIP_VALS_FLAG_MASK = 0x02;
+ private static final int SKIP_VALS_FLAG_MASK = 0x02;
/** */
- public static final int ADD_READER_FLAG_MASK = 0x04;
+ private static final int ADD_READER_FLAG_MASK = 0x04;
/** */
- public static final int NEED_VER_FLAG_MASK = 0x08;
+ private static final int NEED_VER_FLAG_MASK = 0x08;
/** */
- public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
+ private static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
/** Future ID. */
private long futId;
@@ -64,9 +62,6 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
/** */
private KeyCacheObject key;
- /** Partition ID. */
- private int partId = -1;
-
/** Flags. */
private byte flags;
@@ -128,7 +123,6 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
this.cacheId = cacheId;
this.futId = futId;
this.key = key;
- this.partId = key.partition();
this.topVer = topVer;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -202,7 +196,9 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
/** {@inheritDoc} */
@Override public int partition() {
- return partId;
+ assert key != null;
+
+ return key.partition();
}
/**
@@ -257,8 +253,6 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
assert key != null;
- key.partition(partId);
-
GridCacheContext cctx = ctx.cacheContext(cacheId);
key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
@@ -316,14 +310,6 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
reader.incrementState();
case 8:
- partId = reader.readInt("partId", -1);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -331,7 +317,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
reader.incrementState();
- case 10:
+ case 9:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -339,7 +325,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
reader.incrementState();
- case 11:
+ case 10:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -398,24 +384,18 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
writer.incrementState();
case 8:
- if (!writer.writeInt("partId", partId))
- return false;
-
- writer.incrementState();
-
- case 9:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 10:
+ case 9:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 11:
+ case 10:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -438,7 +418,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 12;
+ return 11;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 512f63e..7387501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -54,12 +55,11 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionRollbackException;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
@@ -69,18 +69,6 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx> {
/** */
- public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
-
- /** */
- public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.1");
-
- /** */
- public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = IgniteProductVersion.fromString("1.6.0");
-
- /** */
- public static final IgniteProductVersion ACK_DHT_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.6.8");
-
- /** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
@@ -157,7 +145,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
- for (IgniteInternalFuture<?> fut : futures())
+ for (IgniteInternalFuture<?> fut : futures()) {
if (isMini(fut)) {
MinFuture f = (MinFuture)fut;
@@ -168,6 +156,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
found = true;
}
}
+ }
return found;
}
@@ -209,8 +198,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (fut.getClass() == FinishMiniFuture.class) {
FinishMiniFuture f = (FinishMiniFuture)fut;
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
+ if (f.futureId() == res.miniId()) {
+ assert f.primary().id().equals(nodeId);
finishFut = f;
@@ -253,7 +242,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (fut.getClass() == CheckBackupMiniFuture.class) {
CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
- if (f.futureId().equals(res.miniId())) {
+ if (f.futureId() == res.miniId()) {
found = true;
assert f.node().id().equals(nodeId);
@@ -267,7 +256,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
- if (f.futureId().equals(res.miniId()))
+ if (f.futureId() == res.miniId())
f.onDhtFinishResponse(nodeId, false);
}
}
@@ -298,9 +287,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (isDone())
return false;
- if (err != null)
+ boolean nodeStop = false;
+
+ if (err != null) {
tx.setRollbackOnly();
+ nodeStop = err instanceof NodeStoppingException;
+ }
+
if (commit) {
if (tx.commitError() != null)
err = tx.commitError();
@@ -329,7 +323,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (tx.onePhaseCommit()) {
boolean commit = this.commit && err == null;
- finishOnePhase(commit);
+ if (!nodeStop)
+ finishOnePhase(commit);
try {
tx.tmFinish(commit);
@@ -412,8 +407,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (mappings.single()) {
GridDistributedTxMapping mapping = mappings.singleMapping();
- if (mapping != null)
- finish(mapping, commit);
+ if (mapping != null) {
+ assert !hasFutures() : futures();
+
+ finish(1, mapping, commit);
+ }
}
else
finish(mappings.mappings(), commit);
@@ -453,7 +451,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null) {
- UUID nodeId = mapping.node().id();
+ UUID nodeId = mapping.primary().id();
Collection<UUID> backups = tx.transactionNodes().get(nodeId);
@@ -470,10 +468,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
else if (backup.isLocal())
cctx.tm().removeTxReturn(tx.xidVersion());
- else {
- if (ACK_DHT_ONE_PHASE_SINCE.compareToIgnoreTimestamp(backup.version()) <= 0)
- cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion());
- }
+ else
+ cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion());
}
}
}
@@ -482,10 +478,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
*
*/
private void checkBackup() {
+ assert !hasFutures() : futures();
+
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null) {
- UUID nodeId = mapping.node().id();
+ UUID nodeId = mapping.primary().id();
Collection<UUID> backups = tx.transactionNodes().get(nodeId);
@@ -509,7 +507,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
"(backup has left grid): " + tx.xidVersion(), cause));
}
else {
- final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
+ final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(1, backup, mapping);
add(mini);
@@ -575,24 +573,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
else {
GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId(), false);
- // Preserve old behavior, otherwise response is not sent.
- if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
- finishReq.syncCommit(true);
-
try {
- if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0) {
- cctx.io().send(backup, finishReq, tx.ioPolicy());
+ cctx.io().send(backup, finishReq, tx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near finish fut, sent check committed request [" +
- "txId=" + tx.nearXidVersion() +
- ", node=" + backup.id() + ']');
- }
- }
- else {
- mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " +
- "the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() +
- ", ver=" + backup.version() + ']'));
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near finish fut, sent check committed request [" +
+ "txId=" + tx.nearXidVersion() +
+ ", node=" + backup.id() + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
@@ -624,18 +611,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (tx.mappings().empty())
return false;
- boolean finish = tx.txState().hasNearCache(cctx) || !commit;
-
- if (finish) {
- GridDistributedTxMapping mapping = tx.mappings().singleMapping();
-
- assert mapping != null : tx;
-
- if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(mapping.node().version()) > 0)
- finish = false;
- }
-
- return finish;
+ return tx.txState().hasNearCache(cctx) || !commit;
}
/**
@@ -683,17 +659,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param commit Commit flag.
*/
private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit) {
+ assert !hasFutures() : futures();
+
+ int miniId = 0;
+
// Create mini futures.
for (GridDistributedTxMapping m : mappings)
- finish(m, commit);
+ finish(++miniId, m, commit);
}
/**
+ * @param miniId Mini future ID.
* @param m Mapping.
* @param commit Commit flag.
*/
- private void finish(GridDistributedTxMapping m, boolean commit) {
- ClusterNode n = m.node();
+ private void finish(int miniId, GridDistributedTxMapping m, boolean commit) {
+ ClusterNode n = m.primary();
assert !m.empty();
@@ -728,7 +709,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// If this is the primary node for the keys.
if (n.isLocal()) {
- req.miniId(IgniteUuid.randomUuid());
+ req.miniId(miniId);
IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finish(n.id(), tx, req);
@@ -737,7 +718,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(fut);
}
else {
- FinishMiniFuture fut = new FinishMiniFuture(m);
+ FinishMiniFuture fut = new FinishMiniFuture(miniId, m);
req.miniId(fut.futureId());
@@ -755,12 +736,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
", node=" + n.id() + ']');
}
- boolean wait;
-
- if (syncMode == PRIMARY_SYNC)
- wait = n.version().compareToIgnoreTimestamp(PRIMARY_SYNC_TXS_SINCE) >= 0;
- else
- wait = syncMode == FULL_SYNC;
+ boolean wait = syncMode != FULL_ASYNC;
// If we don't wait for result, then mark future as done.
if (!wait)
@@ -768,7 +744,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
catch (ClusterTopologyCheckedException ignored) {
// Remove previous mapping.
- mappings.remove(m.node().id());
+ mappings.remove(m.primary().id());
fut.onNodeLeft(n.id(), false);
}
@@ -794,7 +770,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (f.getClass() == FinishMiniFuture.class) {
FinishMiniFuture fut = (FinishMiniFuture)f;
- ClusterNode node = fut.node();
+ ClusterNode node = fut.primary();
if (node != null) {
return "FinishFuture[node=" + node.id() +
@@ -837,7 +813,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param waitRemoteTxs Wait for remote txs.
* @return Finish request.
*/
- private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId, boolean waitRemoteTxs) {
+ private GridDhtTxFinishRequest checkCommittedRequest(int miniId, boolean waitRemoteTxs) {
GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
cctx.localNodeId(),
futureId(),
@@ -852,8 +828,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.system(),
tx.ioPolicy(),
false,
- tx.syncMode() == FULL_SYNC,
- tx.syncMode() == FULL_SYNC,
+ tx.syncMode(),
null,
null,
null,
@@ -875,7 +850,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
*/
private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
+
+ /**
+ * @param futId Future ID.
+ */
+ MinFuture(int futId) {
+ this.futId = futId;
+ }
/**
* @param nodeId Node ID.
@@ -887,14 +869,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @return Future ID.
*/
- final IgniteUuid futureId() {
+ final int futureId() {
return futId;
}
}
/**
- * Mini-future for get operations. Mini-futures are only waiting on a single
- * node as opposed to multiple nodes.
+ *
*/
private class FinishMiniFuture extends MinFuture {
/** */
@@ -905,17 +886,20 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private GridDistributedTxMapping m;
/**
+ * @param futId Future ID.
* @param m Mapping.
*/
- FinishMiniFuture(GridDistributedTxMapping m) {
+ FinishMiniFuture(int futId, GridDistributedTxMapping m) {
+ super(futId);
+
this.m = m;
}
/**
* @return Node ID.
*/
- ClusterNode node() {
- return m.node();
+ ClusterNode primary() {
+ return m.primary();
}
/**
@@ -927,10 +911,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/** {@inheritDoc} */
boolean onNodeLeft(UUID nodeId, boolean discoThread) {
- if (nodeId.equals(m.node().id())) {
+ if (nodeId.equals(m.primary().id())) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near finish fut, mini future node left [txId=" + tx.nearXidVersion() +
- ", node=" + m.node().id() + ']');
+ ", node=" + m.primary().id() + ']');
}
if (tx.syncMode() == FULL_SYNC) {
@@ -940,16 +924,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
Collection<UUID> backups = txNodes.get(nodeId);
if (!F.isEmpty(backups)) {
- final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+ final CheckRemoteTxMiniFuture mini;
+
+ synchronized (sync) {
+ int futId = Integer.MIN_VALUE + futuresCountNoLock();
+
+ mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
- add(mini);
+ add(mini);
+ }
GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true);
for (UUID backupId : backups) {
ClusterNode backup = cctx.discovery().node(backupId);
- if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) {
+ if (backup != null) {
if (backup.isLocal()) {
IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
@@ -1014,10 +1004,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private ClusterNode backup;
/**
+ * @param futId Future ID.
* @param backup Backup to check.
* @param m Mapping associated with the backup.
*/
- CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+ CheckBackupMiniFuture(int futId, ClusterNode backup, GridDistributedTxMapping m) {
+ super(futId);
+
this.backup = backup;
this.m = m;
}
@@ -1075,9 +1068,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private Set<UUID> nodes;
/**
+ * @param futId Future ID.
* @param nodes Backup nodes.
*/
- public CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+ CheckRemoteTxMiniFuture(int futId, Set<UUID> nodes) {
+ super(futId);
+
this.nodes = nodes;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index dfbbe18..05c1f3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -40,25 +40,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
private static final long serialVersionUID = 0L;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Explicit lock flag. */
- private boolean explicitLock;
-
- /** Store enabled flag. */
- private boolean storeEnabled;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
+ private int miniId;
/**
* Empty constructor required for {@link Externalizable}.
@@ -109,83 +91,69 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
super(
xidVer,
futId,
+ topVer,
null,
threadId,
commit,
invalidate,
sys,
plc,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
- syncMode == CacheWriteSynchronizationMode.FULL_SYNC,
+ syncMode,
baseVer,
committedVers,
rolledbackVers,
+ subjId,
+ taskNameHash,
txSize,
addDepInfo
);
- this.syncMode = syncMode;
- this.explicitLock = explicitLock;
- this.storeEnabled = storeEnabled;
- this.topVer = topVer;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
+ explicitLock(explicitLock);
+ storeEnabled(storeEnabled);
}
/**
- * @return Transaction write synchronization mode (can be null is message sent from old nodes).
+ * @return Explicit lock flag.
*/
- @Nullable public CacheWriteSynchronizationMode syncMode() {
- return syncMode;
+ public boolean explicitLock() {
+ return isFlag(EXPLICIT_LOCK_FLAG_MASK);
}
/**
- * @return Explicit lock flag.
+ * @param explicitLock Explicit lock flag.
*/
- public boolean explicitLock() {
- return explicitLock;
+ private void explicitLock(boolean explicitLock) {
+ setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
}
/**
* @return Store enabled flag.
*/
public boolean storeEnabled() {
- return storeEnabled;
+ return isFlag(STORE_ENABLED_FLAG_MASK);
}
/**
- * @return Mini future ID.
+ * @param storeEnabled Store enabled flag.
*/
- public IgniteUuid miniId() {
- return miniId;
+ private void storeEnabled(boolean storeEnabled) {
+ setFlag(storeEnabled, STORE_ENABLED_FLAG_MASK);
}
/**
- * @param miniId Mini future ID.
+ * @return Mini future ID.
*/
- public void miniId(IgniteUuid miniId) {
- this.miniId = miniId;
+ public int miniId() {
+ return miniId;
}
/**
- * @return Subject ID.
+ * @param miniId Mini future ID.
*/
- @Nullable public UUID subjectId() {
- return subjId;
- }
+ public void miniId(int miniId) {
+ assert miniId > 0;
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
+ this.miniId = miniId;
}
/** {@inheritDoc} */
@@ -203,44 +171,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 18:
- if (!writer.writeBoolean("explicitLock", explicitLock))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeBoolean("storeEnabled", storeEnabled))
- return false;
-
- writer.incrementState();
-
case 21:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -261,60 +193,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 18:
- explicitLock = reader.readBoolean("explicitLock");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- storeEnabled = reader.readBoolean("storeEnabled");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 21:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 23:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- topVer = reader.readMessage("topVer");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
@@ -333,7 +213,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 25;
+ return 22;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 37fbb36..310e90d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -46,7 +46,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
private byte[] errBytes;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Near tx thread ID. */
private long nearThreadId;
@@ -59,17 +59,23 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param nearThreadId Near tx thread ID.
* @param futId Future ID.
* @param miniId Mini future Id.
* @param err Error.
*/
- public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, IgniteUuid futId, IgniteUuid miniId,
- @Nullable Throwable err) {
- super(xid, futId);
+ public GridNearTxFinishResponse(int part,
+ GridCacheVersion xid,
+ long nearThreadId,
+ IgniteUuid futId,
+ int miniId,
+ @Nullable Throwable err)
+ {
+ super(part, xid, futId);
- assert miniId != null;
+ assert miniId != 0;
this.nearThreadId = nearThreadId;
this.miniId = miniId;
@@ -84,7 +90,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -127,19 +133,19 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
}
switch (writer.state()) {
- case 5:
+ case 7:
if (!writer.writeByteArray("errBytes", errBytes))
return false;
writer.incrementState();
- case 6:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 8:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 7:
+ case 9:
if (!writer.writeLong("nearThreadId", nearThreadId))
return false;
@@ -161,7 +167,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
return false;
switch (reader.state()) {
- case 5:
+ case 7:
errBytes = reader.readByteArray("errBytes");
if (!reader.isLastRead())
@@ -169,15 +175,15 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
- case 6:
- miniId = reader.readIgniteUuid("miniId");
+ case 8:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 7:
+ case 9:
nearThreadId = reader.readLong("nearThreadId");
if (!reader.isLastRead())
@@ -197,7 +203,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a97b0fe..8ed749c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -28,6 +28,7 @@ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -577,12 +578,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
void addEntryMapping(@Nullable Collection<GridDistributedTxMapping> maps) {
if (!F.isEmpty(maps)) {
for (GridDistributedTxMapping map : maps) {
- ClusterNode n = map.node();
+ ClusterNode primary = map.primary();
- GridDistributedTxMapping m = mappings.get(n.id());
+ GridDistributedTxMapping m = mappings.get(primary.id());
if (m == null) {
- mappings.put(m = new GridDistributedTxMapping(n));
+ mappings.put(m = new GridDistributedTxMapping(primary));
m.near(map.near());
@@ -605,7 +606,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @param entry Entry.
*/
void addSingleEntryMapping(GridDistributedTxMapping map, IgniteTxEntry entry) {
- ClusterNode n = map.node();
+ ClusterNode n = map.primary();
GridDistributedTxMapping m = new GridDistributedTxMapping(n);
@@ -883,7 +884,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
catch (IgniteCheckedException e) {
COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
- fut0.finish(false);
+ if (!(e instanceof NodeStoppingException))
+ fut0.finish(false);
}
}
});
@@ -1000,7 +1002,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx,
this,
timeout,
- IgniteUuid.randomUuid(),
+ 0,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
last,
needReturnValue() && implicit());
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 6b95309..a0f28c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -95,9 +95,6 @@ public abstract class GridNearTxPrepareFutureAdapter extends
/** Trackable flag. */
protected boolean trackable = true;
- /** Full information about transaction nodes mapping. */
- protected GridDhtTxMapping txMapping;
-
/**
* @param cctx Context.
* @param tx Transaction.
@@ -160,8 +157,10 @@ public abstract class GridNearTxPrepareFutureAdapter extends
/**
* Checks if mapped transaction can be committed on one phase.
* One-phase commit can be done if transaction maps to one primary node and not more than one backup.
+ *
+ * @param txMapping Transaction mapping.
*/
- protected final void checkOnePhase() {
+ protected final void checkOnePhase(GridDhtTxMapping txMapping) {
if (tx.storeUsed())
return;
@@ -184,14 +183,13 @@ public abstract class GridNearTxPrepareFutureAdapter extends
* @param res Response.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
if (res == null)
return;
assert res.error() == null : res;
- assert F.isEmpty(res.invalidPartitions()) : res;
- UUID nodeId = m.node().id();
+ UUID nodeId = m.primary().id();
for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
IgniteTxEntry txEntry = tx.entry(entry.getKey());
@@ -207,8 +205,11 @@ public abstract class GridNearTxPrepareFutureAdapter extends
CacheVersionedValue tup = entry.getValue();
- nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
- tup.version(), nodeId, tx.topologyVersion());
+ nearEntry.resetFromPrimary(tup.value(),
+ tx.xidVersion(),
+ tup.version(),
+ nodeId,
+ tx.topologyVersion());
}
else if (txEntry.cached().detached()) {
GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index e55566b..ffeeb51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -23,17 +23,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -45,43 +43,36 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final int NEAR_FLAG_MASK = 0x01;
+
+ /** */
+ private static final int FIRST_CLIENT_REQ_FLAG_MASK = 0x02;
+
+ /** */
+ private static final int IMPLICIT_SINGLE_FLAG_MASK = 0x04;
+
+ /** */
+ private static final int EXPLICIT_LOCK_FLAG_MASK = 0x08;
+
/** Future ID. */
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Near mapping flag. */
- private boolean near;
+ private int miniId;
/** Topology version. */
private AffinityTopologyVersion topVer;
- /** {@code True} if this last prepare request for node. */
- private boolean last;
-
- /** IDs of backup nodes receiving last prepare request during this prepare. */
- @GridDirectCollection(UUID.class)
- @GridToStringInclude
- private Collection<UUID> lastBackups;
-
- /** Need return value flag. */
- private boolean retVal;
-
- /** Implicit single flag. */
- private boolean implicitSingle;
-
- /** Explicit lock flag. Set to true if at least one entry was explicitly locked. */
- private boolean explicitLock;
-
/** Subject ID. */
private UUID subjId;
/** Task name hash. */
private int taskNameHash;
- /** {@code True} if first optimistic tx prepare request sent from client node. */
- private boolean firstClientReq;
+ /** */
+ @GridToStringExclude
+ private byte flags;
/**
* Empty constructor required for {@link Externalizable}.
@@ -128,43 +119,42 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
boolean firstClientReq,
boolean addDepInfo
) {
- super(tx, timeout, reads, writes, txNodes, onePhaseCommit, addDepInfo);
+ super(tx,
+ timeout,
+ reads,
+ writes,
+ txNodes,
+ retVal,
+ last,
+ onePhaseCommit,
+ addDepInfo);
assert futId != null;
assert !firstClientReq || tx.optimistic() : tx;
this.futId = futId;
this.topVer = topVer;
- this.near = near;
- this.last = last;
- this.retVal = retVal;
- this.implicitSingle = implicitSingle;
- this.explicitLock = explicitLock;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
- this.firstClientReq = firstClientReq;
- }
- /**
- * @return {@code True} if first optimistic tx prepare request sent from client node.
- */
- public boolean firstClientRequest() {
- return firstClientReq;
+ setFlag(near, NEAR_FLAG_MASK);
+ setFlag(implicitSingle, IMPLICIT_SINGLE_FLAG_MASK);
+ setFlag(explicitLock, EXPLICIT_LOCK_FLAG_MASK);
+ setFlag(firstClientReq, FIRST_CLIENT_REQ_FLAG_MASK);
}
-
/**
- * @return {@code True} if this last prepare request for node.
+ * @return {@code True} if first optimistic tx prepare request sent from client node.
*/
- public boolean last() {
- return last;
+ public final boolean firstClientRequest() {
+ return isFlag(FIRST_CLIENT_REQ_FLAG_MASK);
}
/**
* @return {@code True} if mapping is for near-enabled caches.
*/
- public boolean near() {
- return near;
+ public final boolean near() {
+ return isFlag(NEAR_FLAG_MASK);
}
/**
@@ -177,14 +167,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
* @param miniId Mini future ID.
*/
- public void miniId(IgniteUuid miniId) {
+ public void miniId(int miniId) {
this.miniId = miniId;
}
@@ -203,24 +193,17 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
/**
- * @return Whether return value is requested.
- */
- public boolean returnValue() {
- return retVal;
- }
-
- /**
* @return Implicit single flag.
*/
- public boolean implicitSingle() {
- return implicitSingle;
+ public final boolean implicitSingle() {
+ return isFlag(IMPLICIT_SINGLE_FLAG_MASK);
}
/**
* @return Explicit lock flag.
*/
- public boolean explicitLock() {
- return explicitLock;
+ public final boolean explicitLock() {
+ return isFlag(EXPLICIT_LOCK_FLAG_MASK);
}
/**
@@ -269,6 +252,26 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
return true;
}
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -284,73 +287,37 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 23:
- if (!writer.writeBoolean("explicitLock", explicitLock))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeBoolean("firstClientReq", firstClientReq))
+ case 20:
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
- case 25:
+ case 21:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 26:
- if (!writer.writeBoolean("implicitSingle", implicitSingle))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeBoolean("last", last))
- return false;
-
- writer.incrementState();
-
- case 28:
- if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- case 29:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 22:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 30:
- if (!writer.writeBoolean("near", near))
- return false;
-
- writer.incrementState();
-
- case 31:
- if (!writer.writeBoolean("retVal", retVal))
- return false;
-
- writer.incrementState();
-
- case 32:
+ case 23:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 33:
+ case 24:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 34:
+ case 25:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -372,23 +339,15 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 23:
- explicitLock = reader.readBoolean("explicitLock");
+ case 20:
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 24:
- firstClientReq = reader.readBoolean("firstClientReq");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
+ case 21:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -396,55 +355,15 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
- implicitSingle = reader.readBoolean("implicitSingle");
+ case 22:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 27:
- last = reader.readBoolean("last");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 28:
- lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 29:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 30:
- near = reader.readBoolean("near");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 31:
- retVal = reader.readBoolean("retVal");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 32:
+ case 23:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -452,7 +371,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 24:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -460,7 +379,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 34:
+ case 25:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -480,11 +399,24 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 35;
+ return 26;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearTxPrepareRequest.class, this, super.toString());
+ StringBuilder flags = new StringBuilder();
+
+ if (near())
+ flags.append("near");
+ if (firstClientRequest())
+ flags.append("clientReq");
+ if (implicitSingle())
+ flags.append("single");
+ if (explicitLock())
+ flags.append("explicitLock");
+
+ return S.toString(GridNearTxPrepareRequest.class, this,
+ "flags", flags.toString(),
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 8812709..66fe902 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -61,7 +61,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** DHT version. */
private GridCacheVersion dhtVer;
@@ -69,11 +69,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** Write version. */
private GridCacheVersion writeVer;
- /** */
- @GridToStringInclude
- @GridDirectCollection(int.class)
- private Collection<Integer> invalidParts;
-
/** Map of owned values to set on near node. */
@GridToStringInclude
@GridDirectTransient
@@ -107,6 +102,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
@@ -118,9 +114,10 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
* @param addDepInfo Deployment info flag.
*/
public GridNearTxPrepareResponse(
+ int part,
GridCacheVersion xid,
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
GridCacheVersion dhtVer,
GridCacheVersion writeVer,
GridCacheReturn retVal,
@@ -128,10 +125,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
AffinityTopologyVersion clientRemapVer,
boolean addDepInfo
) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
- assert miniId != null;
assert dhtVer != null;
this.futId = futId;
@@ -145,7 +141,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/**
* @return {@code True} if client node should remap transaction.
*/
- @Nullable public AffinityTopologyVersion clientRemapVersion() {
+ @Nullable AffinityTopologyVersion clientRemapVersion() {
return clientRemapVer;
}
@@ -170,7 +166,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -252,13 +248,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
return ownedVals != null && ownedVals.containsKey(key);
}
- /**
- * @return Invalid partitions.
- */
- public Collection<Integer> invalidPartitions() {
- return invalidParts;
- }
-
/** {@inheritDoc}
* @param ctx*/
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
@@ -355,67 +344,61 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeMessage("clientRemapVer", clientRemapVer))
return false;
writer.incrementState();
- case 9:
+ case 11:
if (!writer.writeMessage("dhtVer", dhtVer))
return false;
writer.incrementState();
- case 10:
+ case 12:
if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 11:
+ case 13:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 12:
- if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 14:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 14:
+ case 15:
if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 15:
+ case 16:
if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 16:
+ case 17:
if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 17:
+ case 18:
if (!writer.writeMessage("retVal", retVal))
return false;
writer.incrementState();
- case 18:
+ case 19:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -437,7 +420,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
return false;
switch (reader.state()) {
- case 8:
+ case 10:
clientRemapVer = reader.readMessage("clientRemapVer");
if (!reader.isLastRead())
@@ -445,7 +428,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 9:
+ case 11:
dhtVer = reader.readMessage("dhtVer");
if (!reader.isLastRead())
@@ -453,7 +436,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 10:
+ case 12:
filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -461,7 +444,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 11:
+ case 13:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -469,23 +452,15 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 12:
- invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- miniId = reader.readIgniteUuid("miniId");
+ case 14:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 14:
+ case 15:
ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -493,7 +468,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 15:
+ case 16:
ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -501,7 +476,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 16:
+ case 17:
pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -509,7 +484,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 17:
+ case 18:
retVal = reader.readMessage("retVal");
if (!reader.isLastRead())
@@ -517,7 +492,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
- case 18:
+ case 19:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -537,12 +512,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 19;
+ return 20;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxPrepareResponse.class, this, "super", super.toString());
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
index be78868..c32a844 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
@@ -62,14 +62,6 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest {
writer.onHeaderWritten();
}
- switch (writer.state()) {
- case 8:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
- }
-
return true;
}
@@ -83,16 +75,6 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest {
if (!super.readFrom(buf, reader))
return false;
- switch (reader.state()) {
- case 8:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
- }
-
return reader.afterMessageRead(GridNearUnlockRequest.class);
}
@@ -103,7 +85,7 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
index 7dec7af..9373bc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
@@ -50,7 +50,7 @@ public class IgniteTxMappingsImpl implements IgniteTxMappings {
/** {@inheritDoc} */
@Override public void put(GridDistributedTxMapping mapping) {
- mappings.put(mapping.node().id(), mapping);
+ mappings.put(mapping.primary().id(), mapping);
}
/** {@inheritDoc} */
@@ -61,7 +61,7 @@ public class IgniteTxMappingsImpl implements IgniteTxMappings {
/** {@inheritDoc} */
@Nullable @Override public GridDistributedTxMapping localMapping() {
for (GridDistributedTxMapping m : mappings.values()) {
- if (m.node().isLocal())
+ if (m.primary().isLocal())
return m;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
index fc15592..b37f8d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
@@ -44,7 +44,7 @@ public class IgniteTxMappingsSingleImpl implements IgniteTxMappings {
@Override public GridDistributedTxMapping get(UUID nodeId) {
GridDistributedTxMapping mapping0 = mapping;
- return (mapping0 != null && mapping0.node().id().equals(nodeId)) ? mapping0 : null;
+ return (mapping0 != null && mapping0.primary().id().equals(nodeId)) ? mapping0 : null;
}
/** {@inheritDoc} */
@@ -58,7 +58,7 @@ public class IgniteTxMappingsSingleImpl implements IgniteTxMappings {
@Override public GridDistributedTxMapping remove(UUID nodeId) {
GridDistributedTxMapping mapping0 = mapping;
- if (mapping0 != null && mapping0.node().id().equals(nodeId)) {
+ if (mapping0 != null && mapping0.primary().id().equals(nodeId)) {
this.mapping = null;
return mapping0;
@@ -71,7 +71,7 @@ public class IgniteTxMappingsSingleImpl implements IgniteTxMappings {
@Nullable @Override public GridDistributedTxMapping localMapping() {
GridDistributedTxMapping mapping0 = mapping;
- if (mapping0 != null && mapping0.node().isLocal())
+ if (mapping0 != null && mapping0.primary().isLocal())
return mapping0;
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 1691fd7..14a7ed0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -66,7 +66,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_ST
* {@link #equals(Object)} method, as transaction entries should use referential
* equality.
*/
-@IgniteCodeGeneratingFail // Field filters, partId should not be generated by MessageCodeGenerator.
+@IgniteCodeGeneratingFail // Field filters should not be generated by MessageCodeGenerator.
public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** */
private static final long serialVersionUID = 0L;
@@ -99,9 +99,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** Cache ID. */
private int cacheId;
- /** Partition ID. */
- private int partId = -1;
-
/** Transient tx key. */
@GridDirectTransient
private IgniteTxKey txKey;
@@ -261,7 +258,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
keepBinary(keepBinary);
key = entry.key();
- partId = entry.key().partition();
cacheId = entry.context().cacheId();
}
@@ -314,7 +310,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
addEntryProcessor(entryProcessor, invokeArgs);
key = entry.key();
- partId = entry.key().partition();
cacheId = entry.context().cacheId();
}
@@ -348,7 +343,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
IgniteTxEntry cp = new IgniteTxEntry();
cp.key = key;
- cp.partId = partId;
cp.cacheId = cacheId;
cp.ctx = ctx;
@@ -935,8 +929,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
key.finishUnmarshal(context().cacheObjectContext(), clsLdr);
- key.partition(partId);
-
val.unmarshal(this.ctx, clsLdr);
if (expiryPlcBytes != null && expiryPlc == null)
@@ -1067,40 +1059,35 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
writer.incrementState();
case 8:
- if (!writer.writeMessage("serReadVer", serReadVer))
+ if (!writer.writeMessage("oldVal", oldVal))
return false;
writer.incrementState();
case 9:
- if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
+ if (!writer.writeMessage("serReadVer", serReadVer))
return false;
writer.incrementState();
case 10:
- if (!writer.writeLong("ttl", ttl))
+ if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
return false;
writer.incrementState();
case 11:
- if (!writer.writeMessage("val", val))
+ if (!writer.writeLong("ttl", ttl))
return false;
writer.incrementState();
case 12:
- if (!writer.writeInt("partId", partId))
+ if (!writer.writeMessage("val", val))
return false;
writer.incrementState();
- case 13:
- if (!writer.writeMessage("oldVal", oldVal))
- return false;
-
- writer.incrementState();
}
return true;
@@ -1179,7 +1166,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 8:
- serReadVer = reader.readMessage("serReadVer");
+ oldVal = reader.readMessage("oldVal");
if (!reader.isLastRead())
return false;
@@ -1187,7 +1174,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 9:
- transformClosBytes = reader.readByteArray("transformClosBytes");
+ serReadVer = reader.readMessage("serReadVer");
if (!reader.isLastRead())
return false;
@@ -1195,7 +1182,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 10:
- ttl = reader.readLong("ttl");
+ transformClosBytes = reader.readByteArray("transformClosBytes");
if (!reader.isLastRead())
return false;
@@ -1203,7 +1190,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 11:
- val = reader.readMessage("val");
+ ttl = reader.readLong("ttl");
if (!reader.isLastRead())
return false;
@@ -1211,20 +1198,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 12:
- partId = reader.readInt("partId", -1);
+ val = reader.readMessage("val");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 13:
- oldVal = reader.readMessage("oldVal");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
}
return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1237,7 +1217,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 13;
}
/** {@inheritDoc} */