You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/21 09:24:27 UTC
[36/71] [abbrv] ignite git commit: Changed tx mini future ids from
IgniteUuid to int, removed some legacy code from tx processing.
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 7199ede..bff69bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -72,13 +72,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
private IgniteUuid nearFutId;
/** Near future ID. */
- private IgniteUuid nearMiniId;
+ private int nearMiniId;
/** Near future ID. */
private IgniteUuid nearFinFutId;
/** Near future ID. */
- private IgniteUuid nearFinMiniId;
+ private int nearFinMiniId;
/** Near XID. */
private GridCacheVersion nearXidVer;
@@ -121,7 +121,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
UUID nearNodeId,
GridCacheVersion nearXidVer,
IgniteUuid nearFutId,
- IgniteUuid nearMiniId,
+ int nearMiniId,
long nearThreadId,
boolean implicit,
boolean implicitSingle,
@@ -159,7 +159,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
assert nearNodeId != null;
assert nearFutId != null;
- assert nearMiniId != null;
+ assert nearMiniId != 0;
assert nearXidVer != null;
this.nearNodeId = nearNodeId;
@@ -255,16 +255,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/**
- * @return Near future mini ID.
- */
- public IgniteUuid nearFinishMiniId() {
- return nearFinMiniId;
- }
-
- /**
* @param nearFinMiniId Near future mini ID.
*/
- public void nearFinishMiniId(IgniteUuid nearFinMiniId) {
+ public void nearFinishMiniId(int nearFinMiniId) {
this.nearFinMiniId = nearFinMiniId;
}
@@ -394,7 +387,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
@Nullable Collection<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
long msgId,
- IgniteUuid nearMiniId,
+ int nearMiniId,
Map<UUID, Collection<UUID>> txNodes,
boolean last
) {
@@ -417,7 +410,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
needReturnValue()))) {
GridDhtTxPrepareFuture f = prepFut;
- assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
+ assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
"[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
if (timeout == -1)
@@ -427,7 +420,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
else {
- assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
+ assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
"[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
// Prepare was called explicitly.
@@ -626,7 +619,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
"Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit +
", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']';
- assert nearMiniId != null;
+ assert nearMiniId != 0;
return super.finish(commit);
}
@@ -641,8 +634,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
return;
}
- GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId,
- nearFinMiniId, err);
+ GridNearTxFinishResponse res = new GridNearTxFinishResponse(
+ -1,
+ nearXidVer,
+ threadId,
+ nearFinFutId,
+ nearFinMiniId,
+ err);
try {
cctx.io().send(nearNodeId, res, ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1227ba9..56884ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -177,7 +177,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private boolean trackable = true;
/** Near mini future id. */
- private IgniteUuid nearMiniId;
+ private int nearMiniId;
/** DHT versions map. */
private Map<IgniteTxKey, GridCacheVersion> dhtVerMap;
@@ -223,7 +223,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridCacheSharedContext cctx,
final GridDhtTxLocalAdapter tx,
long timeout,
- IgniteUuid nearMiniId,
+ int nearMiniId,
Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
boolean last,
boolean retVal
@@ -263,7 +263,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @return Near mini future id.
*/
- public IgniteUuid nearMiniId() {
+ int nearMiniId() {
return nearMiniId;
}
@@ -562,7 +562,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -856,9 +856,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert F.isEmpty(tx.invalidPartitions());
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ -1,
tx.nearXidVersion(),
tx.colocated() ? tx.xid() : tx.nearFutureId(),
- nearMiniId == null ? tx.xid() : nearMiniId,
+ nearMiniId,
tx.xidVersion(),
tx.writeVersion(),
ret,
@@ -1233,6 +1234,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return;
if (last) {
+ int miniId = 0;
+
assert tx.transactionNodes() != null;
final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
@@ -1241,7 +1244,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
assert !dhtMapping.empty();
- ClusterNode n = dhtMapping.node();
+ ClusterNode n = dhtMapping.primary();
assert !n.isLocal();
@@ -1257,7 +1260,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
+ MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
add(fut); // Append new future.
@@ -1367,11 +1370,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
- if (!tx.dhtMap().containsKey(nearMapping.node().id())) {
+ if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping);
+ MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
add(fut); // Append new future.
@@ -1417,12 +1420,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert req.transactionNodes() != null;
try {
- cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
+ cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.node().id() + ']');
+ ", node=" + nearMapping.primary().id() + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
@@ -1433,7 +1436,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.node().id() + ']');
+ ", node=" + nearMapping.primary().id() + ']');
}
fut.onResult(e);
@@ -1442,7 +1445,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (msgLog.isDebugEnabled()) {
msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
", dhtTxId=" + tx.xidVersion() +
- ", node=" + nearMapping.node().id() +
+ ", node=" + nearMapping.primary().id() +
", err=" + e + ']');
}
}
@@ -1479,27 +1482,37 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
while (true) {
try {
- Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
+ List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
+
+ assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
", entry=" + entry + ']');
- // Exclude local node.
- map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap);
+ for (int i = 1; i < dhtNodes.size(); i++) {
+ ClusterNode node = dhtNodes.get(i);
+
+ addMapping(entry, node, dhtMap);
+ }
Collection<UUID> readers = cached.readers();
if (!F.isEmpty(readers)) {
- Collection<ClusterNode> nearNodes =
- cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId())));
+ for (UUID readerId : readers) {
+ if (readerId.equals(tx.nearNodeId()))
+ continue;
- if (log.isDebugEnabled())
- log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) +
- ", entry=" + entry + ']');
+ ClusterNode readerNode = cctx.discovery().node(readerId);
+
+ if (readerNode == null || dhtNodes.contains(readerNode))
+ continue;
+
+ if (log.isDebugEnabled())
+ log.debug("Mapping entry to near node [node=" + readerNode + ", entry=" + entry + ']');
- // Exclude DHT nodes.
- map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap);
+ addMapping(entry, readerNode, nearMap);
+ }
}
else if (log.isDebugEnabled())
log.debug("Entry has no near readers: " + entry);
@@ -1516,39 +1529,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param entry Entry.
- * @param nodes Nodes.
+ * @param n Node.
* @param globalMap Map.
*/
- private void map(
+ private void addMapping(
IgniteTxEntry entry,
- Iterable<ClusterNode> nodes,
+ ClusterNode n,
Map<UUID, GridDistributedTxMapping> globalMap
) {
- if (nodes != null) {
- for (ClusterNode n : nodes) {
- GridDistributedTxMapping global = globalMap.get(n.id());
+ GridDistributedTxMapping global = globalMap.get(n.id());
- if (!F.isEmpty(entry.entryProcessors())) {
- GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
- entry.cached().partition());
+ if (!F.isEmpty(entry.entryProcessors())) {
+ GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+ entry.cached().partition());
- if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
- T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+ T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
- assert procVal != null : entry;
+ assert procVal != null : entry;
- entry.op(procVal.get1());
- entry.value(procVal.get2(), true, false);
- entry.entryProcessors(null);
- }
- }
-
- if (global == null)
- globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
-
- global.add(entry);
+ entry.op(procVal.get1());
+ entry.value(procVal.get2(), true, false);
+ entry.entryProcessors(null);
}
}
+
+ if (global == null)
+ globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
+
+ global.add(entry);
}
/**
@@ -1602,7 +1611,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Node ID. */
private UUID nodeId;
@@ -1617,17 +1626,20 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param nodeId Node ID.
+ * @param futId Future ID.
* @param dhtMapping Mapping.
* @param nearMapping nearMapping.
*/
MiniFuture(
UUID nodeId,
+ int futId,
GridDistributedTxMapping dhtMapping,
GridDistributedTxMapping nearMapping
) {
- assert dhtMapping == null || nearMapping == null || dhtMapping.node().equals(nearMapping.node());
+ assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
this.nodeId = nodeId;
+ this.futId = futId;
this.dhtMapping = dhtMapping;
this.nearMapping = nearMapping;
}
@@ -1635,7 +1647,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
@@ -1643,7 +1655,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return Node ID.
*/
public ClusterNode node() {
- return dhtMapping != null ? dhtMapping.node() : nearMapping.node();
+ return dhtMapping != null ? dhtMapping.primary() : nearMapping.primary();
}
/**
@@ -1689,7 +1701,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
try {
GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
- cached.removeReader(nearMapping.node().id(), res.messageId());
+ cached.removeReader(nearMapping.primary().id(), res.messageId());
break;
}
@@ -1709,22 +1721,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
// Process invalid partitions (no need to remap).
- // Keep this loop for backward compatibility.
- if (!F.isEmpty(res.invalidPartitions())) {
- for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
- IgniteTxEntry entry = it.next();
-
- if (res.invalidPartitions().contains(entry.cached().partition())) {
- it.remove();
-
- if (log.isDebugEnabled())
- log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() +
- ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
- }
- }
- }
-
- // Process invalid partitions (no need to remap).
if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index a8f2087..8898803 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -52,9 +52,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** */
private static final long serialVersionUID = 0L;
- /** */
- public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
-
/** Max order. */
private UUID nearNodeId;
@@ -62,7 +59,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -91,9 +88,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Near transaction ID. */
private GridCacheVersion nearXidVer;
- /** {@code True} if this is last prepare request for node. */
- private boolean last;
-
/** Subject ID. */
private UUID subjId;
@@ -103,9 +97,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Preload keys. */
private BitSet preloadKeys;
- /** */
- private byte flags;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -129,7 +120,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
*/
public GridDhtTxPrepareRequest(
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
AffinityTopologyVersion topVer,
GridDhtTxLocalAdapter tx,
long timeout,
@@ -143,17 +134,24 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
int taskNameHash,
boolean addDepInfo,
boolean retVal) {
- super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo);
+ super(tx,
+ timeout,
+ null,
+ dhtWrites,
+ txNodes,
+ retVal,
+ last,
+ onePhaseCommit,
+ addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.topVer = topVer;
this.futId = futId;
this.nearWrites = nearWrites;
this.miniId = miniId;
this.nearXidVer = nearXidVer;
- this.last = last;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -165,30 +163,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
/**
- * @return Flag indicating whether transaction needs return value.
- */
- public boolean needReturnValue() {
- return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
- }
-
- /**
- * @param retVal Need return value.
- */
- public void needReturnValue(boolean retVal) {
- if (retVal)
- flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
- else
- flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
- }
-
- /**
- * @return {@code True} if this is last prepare request for node.
- */
- public boolean last() {
- return last;
- }
-
- /**
* @return Near transaction ID.
*/
public GridCacheVersion nearXidVersion() {
@@ -227,7 +201,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param idx Entry index to set invalidation flag.
* @param invalidate Invalidation flag value.
*/
- public void invalidateNearEntry(int idx, boolean invalidate) {
+ void invalidateNearEntry(int idx, boolean invalidate) {
invalidateNearEntries.set(idx, invalidate);
}
@@ -244,7 +218,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
*
* @param idx Key index.
*/
- public void markKeyForPreload(int idx) {
+ void markKeyForPreload(int idx) {
if (preloadKeys == null)
preloadKeys = new BitSet();
@@ -271,7 +245,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -374,85 +348,73 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 23:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 24:
+ case 20:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 25:
+ case 21:
if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
return false;
writer.incrementState();
- case 26:
- if (!writer.writeBoolean("last", last))
- return false;
-
- writer.incrementState();
-
- case 27:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 22:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 28:
+ case 23:
if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
- case 29:
+ case 24:
if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 30:
+ case 25:
if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
- case 31:
+ case 26:
if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 32:
+ case 27:
if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 33:
+ case 28:
if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
- case 34:
+ case 29:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 35:
+ case 30:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 36:
+ case 31:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -474,15 +436,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 23:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
+ case 20:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -490,7 +444,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 25:
+ case 21:
invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
if (!reader.isLastRead())
@@ -498,23 +452,15 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
- last = reader.readBoolean("last");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 27:
- miniId = reader.readIgniteUuid("miniId");
+ case 22:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 28:
+ case 23:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -522,7 +468,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 24:
nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -530,7 +476,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 25:
nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
@@ -538,7 +484,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 26:
ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -546,7 +492,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 27:
ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -554,7 +500,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 28:
preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
@@ -562,7 +508,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 34:
+ case 29:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -570,7 +516,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 35:
+ case 30:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -578,7 +524,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 36:
+ case 31:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -598,6 +544,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 37;
+ return 32;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 2eba9f1..fba68ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -58,16 +58,11 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Invalid partitions. */
- @GridToStringInclude
- @GridDirectCollection(int.class)
- private Collection<Integer> invalidParts;
+ private int miniId;
/** Invalid partitions by cache ID. */
@GridDirectMap(keyType = Integer.class, valueType = int[].class)
- private Map<Integer, int[]> invalidPartsByCacheId;
+ private Map<Integer, int[]> invalidParts;
/** Preload entries. */
@GridDirectCollection(GridCacheEntryInfo.class)
@@ -81,34 +76,46 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) {
- super(xid, addDepInfo);
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
+ int miniId,
+ boolean addDepInfo) {
+ super(part, xid, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
}
/**
+ * @param part Partition.
* @param xid Xid version.
* @param futId Future ID.
* @param miniId Mini future ID.
* @param err Error.
* @param addDepInfo Deployment enabled.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err,
+ public GridDhtTxPrepareResponse(
+ int part,
+ GridCacheVersion xid,
+ IgniteUuid futId,
+ int miniId,
+ Throwable err,
boolean addDepInfo) {
- super(xid, err, addDepInfo);
+ super(part, xid, err, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
@@ -117,7 +124,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Evicted readers.
*/
- public Collection<IgniteTxKey> nearEvicted() {
+ Collection<IgniteTxKey> nearEvicted() {
return nearEvicted;
}
@@ -138,36 +145,22 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
- * @return Invalid partitions.
- */
- public Collection<Integer> invalidPartitions() {
- return invalidParts;
- }
-
- /**
- * @param invalidParts Invalid partitions.
- */
- public void invalidPartitions(Collection<Integer> invalidParts) {
- this.invalidParts = invalidParts;
- }
-
- /**
* @return Map from cacheId to an array of invalid partitions.
*/
- public Map<Integer, int[]> invalidPartitionsByCacheId() {
- return invalidPartsByCacheId;
+ Map<Integer, int[]> invalidPartitionsByCacheId() {
+ return invalidParts;
}
/**
* @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions.
*/
public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) {
- this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId);
+ this.invalidParts = CU.convertInvalidPartitions(invalidPartsByCacheId);
}
/**
@@ -175,7 +168,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
*
* @return Collection of entry infos need to be preloaded.
*/
- public Collection<GridCacheEntryInfo> preloadEntries() {
+ Collection<GridCacheEntryInfo> preloadEntries() {
return preloadEntries == null ? Collections.<GridCacheEntryInfo>emptyList() : preloadEntries;
}
@@ -193,8 +186,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
preloadEntries.add(info);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -237,11 +229,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString());
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -256,37 +243,31 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
switch (writer.state()) {
- case 8:
+ case 10:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 9:
- if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMap("invalidPartsByCacheId", invalidPartsByCacheId, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+ case 11:
+ if (!writer.writeMap("invalidParts", invalidParts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
- case 11:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ case 12:
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
- case 12:
+ case 13:
if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 13:
+ case 14:
if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
return false;
@@ -308,7 +289,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
return false;
switch (reader.state()) {
- case 8:
+ case 10:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -316,31 +297,23 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 9:
- invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- invalidPartsByCacheId = reader.readMap("invalidPartsByCacheId", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+ case 11:
+ invalidParts = reader.readMap("invalidParts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 11:
- miniId = reader.readIgniteUuid("miniId");
+ case 12:
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 12:
+ case 13:
nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -348,7 +321,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
- case 13:
+ case 14:
preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -368,6 +341,12 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 15;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxPrepareResponse.class, this,
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
index 3737295..752df54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
@@ -119,11 +119,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
writer.incrementState();
- case 9:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
}
return true;
@@ -148,14 +143,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
reader.incrementState();
- case 9:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridDhtUnlockRequest.class);
@@ -168,6 +155,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 9;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 79ca108..0ce380d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -160,6 +160,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/** Keep binary. */
private final boolean keepBinary;
+ /** */
+ private int miniId;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@ -485,7 +488,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
* @return Mini future.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -499,7 +502,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -1049,7 +1052,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
if (node.isLocal())
lockLocally(mappedKeys, req.topologyVersion());
else {
- final MiniFuture fut = new MiniFuture(node, mappedKeys);
+ final MiniFuture fut = new MiniFuture(node, mappedKeys, ++miniId);
req.miniId(fut.futureId());
@@ -1393,7 +1396,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Node ID. */
@GridToStringExclude
@@ -1409,19 +1412,22 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/**
* @param node Node.
* @param keys Keys.
+ * @param futId Mini future ID.
*/
MiniFuture(
ClusterNode node,
- Collection<KeyCacheObject> keys
+ Collection<KeyCacheObject> keys,
+ int futId
) {
this.node = node;
this.keys = keys;
+ this.futId = futId;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 7ca2635..79c71b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -73,10 +73,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
@GridDirectCollection(KeyCacheObject.class)
private List<KeyCacheObject> keys;
- /** Partition IDs. */
- @GridDirectCollection(int.class)
- private List<Integer> partIds;
-
/** */
@GridDirectCollection(boolean.class)
private Collection<Boolean> flags;
@@ -154,12 +150,10 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
this.keys = new ArrayList<>(keys.size());
flags = new ArrayList<>(keys.size());
- partIds = new ArrayList<>(keys.size());
for (Map.Entry<KeyCacheObject, Boolean> entry : keys.entrySet()) {
this.keys.add(entry.getKey());
flags.add(entry.getValue());
- partIds.add(entry.getKey().partition());
}
this.readThrough = readThrough;
@@ -259,7 +253,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
/** {@inheritDoc} */
@Override public int partition() {
- return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ return keys != null && !keys.isEmpty() ? keys.get(0).partition() : -1;
}
/**
@@ -302,13 +296,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
while (keysIt.hasNext())
keyMap.put(keysIt.next(), flagsIt.next());
}
-
- if (partIds != null && !partIds.isEmpty()) {
- assert partIds.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++)
- keys.get(i).partition(partIds.get(i));
- }
}
/** {@inheritDoc} */
@@ -368,48 +355,42 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
writer.incrementState();
case 9:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 10:
if (!writer.writeBoolean("readThrough", readThrough))
return false;
writer.incrementState();
- case 11:
+ case 10:
if (!writer.writeBoolean("reload", reload))
return false;
writer.incrementState();
- case 12:
+ case 11:
if (!writer.writeBoolean("skipVals", skipVals))
return false;
writer.incrementState();
- case 13:
+ case 12:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 14:
+ case 13:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 15:
+ case 14:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 16:
+ case 15:
if (!writer.writeMessage("ver", ver))
return false;
@@ -480,14 +461,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
case 9:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
readThrough = reader.readBoolean("readThrough");
if (!reader.isLastRead())
@@ -495,7 +468,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 11:
+ case 10:
reload = reader.readBoolean("reload");
if (!reader.isLastRead())
@@ -503,7 +476,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 12:
+ case 11:
skipVals = reader.readBoolean("skipVals");
if (!reader.isLastRead())
@@ -511,7 +484,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 13:
+ case 12:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -519,7 +492,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 14:
+ case 13:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -527,7 +500,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 15:
+ case 14:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -535,7 +508,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
reader.incrementState();
- case 16:
+ case 15:
ver = reader.readMessage("ver");
if (!reader.isLastRead())
@@ -555,7 +528,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 17;
+ return 16;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index d3e3a15..ffc84d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -164,6 +164,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/** Keep binary context flag. */
private final boolean keepBinary;
+ /** */
+ private int miniId;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@ -532,7 +535,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* @return Mini future.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -546,7 +549,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -1178,7 +1181,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
req.filter(filter, cctx);
if (node.isLocal()) {
- req.miniId(IgniteUuid.randomUuid());
+ req.miniId(-1);
if (log.isDebugEnabled())
log.debug("Before locally locking near request: " + req);
@@ -1316,7 +1319,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
fut));
}
else {
- final MiniFuture fut = new MiniFuture(node, mappedKeys);
+ final MiniFuture fut = new MiniFuture(node, mappedKeys, ++miniId);
req.miniId(fut.futureId());
@@ -1489,7 +1492,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Node ID. */
@GridToStringExclude
@@ -1505,19 +1508,22 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/**
* @param node Node.
* @param keys Keys.
+ * @param futId Mini future ID.
*/
MiniFuture(
ClusterNode node,
- Collection<KeyCacheObject> keys
+ Collection<KeyCacheObject> keys,
+ int futId
) {
this.node = node;
this.keys = keys;
+ this.futId = futId;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 9e12153..229961e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -50,7 +50,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
private AffinityTopologyVersion topVer;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Filter. */
private CacheEntryPredicate[] filter;
@@ -256,14 +256,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
* @param miniId Mini future Id.
*/
- public void miniId(IgniteUuid miniId) {
+ public void miniId(int miniId) {
this.miniId = miniId;
}
@@ -423,7 +423,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
writer.incrementState();
case 28:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -464,12 +464,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
writer.incrementState();
- case 35:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -551,7 +545,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
case 28:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
@@ -606,14 +600,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 35:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridNearLockRequest.class);
@@ -626,7 +612,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 36;
+ return 35;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index e48a098..b10591d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -47,7 +47,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
private Collection<GridCacheVersion> pending;
/** */
- private IgniteUuid miniId;
+ private int miniId;
/** DHT versions. */
@GridToStringInclude
@@ -85,7 +85,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
int cacheId,
GridCacheVersion lockVer,
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
boolean filterRes,
int cnt,
Throwable err,
@@ -94,7 +94,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
) {
super(cacheId, lockVer, futId, cnt, err, addDepInfo);
- assert miniId != null;
+ assert miniId != 0;
this.miniId = miniId;
this.clientRemapVer = clientRemapVer;
@@ -134,7 +134,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -233,7 +233,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
writer.incrementState();
case 14:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -293,7 +293,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
reader.incrementState();
case 14:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index dbc8096..80508dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -56,9 +56,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
-import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -70,9 +68,6 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
*/
public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
/** */
- public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0");
-
- /** */
@GridToStringExclude
private KeyLockFuture keyLockFut;
@@ -80,6 +75,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
@GridToStringExclude
private ClientRemapFuture remapFut;
+ /** */
+ private int miniId;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -153,7 +151,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (isMini(fut)) {
MiniFuture f = (MiniFuture) fut;
- if (f.node().id().equals(nodeId)) {
+ if (f.primary().id().equals(nodeId)) {
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
nodeId);
@@ -186,7 +184,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (e instanceof IgniteTxOptimisticCheckedException || e instanceof IgniteTxTimeoutCheckedException) {
if (m != null)
- tx.removeMapping(m.node().id());
+ tx.removeMapping(m.primary().id());
}
ERR_UPD.compareAndSet(this, null, e);
@@ -227,7 +225,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -241,7 +239,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -339,15 +337,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
assert topVer.topologyVersion() > 0;
- txMapping = new GridDhtTxMapping();
+ GridDhtTxMapping txMapping = new GridDhtTxMapping();
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
for (IgniteTxEntry write : writes)
- map(write, topVer, mappings, remap, topLocked);
+ map(write, topVer, mappings, txMapping, remap, topLocked);
for (IgniteTxEntry read : reads)
- map(read, topVer, mappings, remap, topLocked);
+ map(read, topVer, mappings, txMapping, remap, topLocked);
if (keyLockFut != null)
keyLockFut.onAllKeysAdded();
@@ -365,12 +363,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase();
+ checkOnePhase(txMapping);
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
- add(new MiniFuture(this, m));
+ add(new MiniFuture(this, m, ++miniId));
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
@@ -385,7 +383,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
MiniFuture fut = (MiniFuture)fut0;
- IgniteCheckedException err = prepare(fut);
+ IgniteCheckedException err = prepare(fut, txMapping);
if (err != null) {
while (it.hasNext()) {
@@ -396,7 +394,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
fut = (MiniFuture)fut0;
- tx.removeMapping(fut.mapping().node().id());
+ tx.removeMapping(fut.mapping().primary().id());
fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err));
}
@@ -421,10 +419,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param fut Mini future.
* @return Prepare error if any.
*/
- @Nullable private IgniteCheckedException prepare(final MiniFuture fut) {
+ @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) {
GridDistributedTxMapping m = fut.mapping();
- final ClusterNode n = m.node();
+ final ClusterNode primary = m.primary();
long timeout = tx.remainingTime();
@@ -477,8 +475,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
req.miniId(fut.futureId());
// If this is the primary node for the keys.
- if (n.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+ if (primary.isLocal()) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(),
+ tx,
+ req);
prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
@Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
@@ -493,7 +493,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
else {
try {
- cctx.io().send(n, req, tx.ioPolicy());
+ cctx.io().send(primary, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException e) {
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
@@ -523,6 +523,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+ GridDhtTxMapping txMapping,
boolean remap,
boolean topLocked
) {
@@ -544,13 +545,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
}
- if (primary.version().compareTo(SER_TX_SINCE) < 0) {
- onDone(new IgniteCheckedException("Optimistic serializable transactions can be used only with node " +
- "version starting from " + SER_TX_SINCE));
-
- return;
- }
-
// Must re-initialize cached entry while holding topology lock.
if (cacheCtx.isNear())
entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
@@ -626,8 +620,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
Collection<String> futs = F.viewReadOnly(futures(),
new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
- return "[node=" + ((MiniFuture)f).node().id() +
- ", loc=" + ((MiniFuture)f).node().isLocal() +
+ return "[node=" + ((MiniFuture)f).primary().id() +
+ ", loc=" + ((MiniFuture)f).primary().isLocal() +
", done=" + f.isDone() + "]";
}
},
@@ -654,7 +648,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
* Constructor.
*/
- public ClientRemapFuture() {
+ ClientRemapFuture() {
super(new ClientRemapFutureReducer());
}
}
@@ -697,7 +691,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Parent future. */
private final GridNearOptimisticSerializableTxPrepareFuture parent;
@@ -713,24 +707,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/**
* @param parent Parent future.
* @param m Mapping.
+ * @param futId Mini future ID.
*/
- MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m) {
+ MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m, int futId) {
this.parent = parent;
this.m = m;
+ this.futId = futId;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
/**
- * @return Node ID.
+ * @return Primary node.
*/
- public ClusterNode node() {
- return m.node();
+ public ClusterNode primary() {
+ return m.primary();
}
/**
@@ -795,7 +791,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
assert parent.cctx.kernalContext().clientNode();
assert m.clientFirst();
- parent.tx.removeMapping(m.node().id());
+ parent.tx.removeMapping(m.primary().id());
ClientRemapFuture remapFut0 = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index a2cb182..6189b38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
@@ -75,6 +74,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
@GridToStringExclude
private KeyLockFuture keyLockFut;
+ /** */
+ private int miniId;
+
+ /** */
+ private GridDhtTxMapping txMapping;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -232,7 +237,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -246,7 +251,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -352,7 +357,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
GridDistributedTxMapping mapping = map(write, topVer, null, topLocked, remap);
- if (mapping.node().isLocal()) {
+ if (mapping.primary().isLocal()) {
if (write.context().isNear())
tx.nearLocallyMapped(true);
else if (write.context().isColocated())
@@ -377,7 +382,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase();
+ checkOnePhase(txMapping);
proceedPrepare(mapping, null);
}
@@ -414,12 +419,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
updated.last(true);
- GridDistributedTxMapping prev = map.put(updated.node().id(), updated);
+ GridDistributedTxMapping prev = map.put(updated.primary().id(), updated);
if (prev != null)
prev.last(false);
- if (updated.node().isLocal()) {
+ if (updated.primary().isLocal()) {
if (write.context().isNear())
tx.nearLocallyMapped(true);
else if (write.context().isColocated())
@@ -446,7 +451,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
tx.transactionNodes(txMapping.transactionNodes());
- checkOnePhase();
+ checkOnePhase(txMapping);
proceedPrepare(mappings);
}
@@ -480,7 +485,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
try {
assert !m.empty();
- final ClusterNode n = m.node();
+ final ClusterNode n = m.primary();
long timeout = tx.remainingTime();
@@ -521,7 +526,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- final MiniFuture fut = new MiniFuture(this, m, mappings);
+ final MiniFuture fut = new MiniFuture(this, m, ++miniId, mappings);
req.miniId(fut.futureId());
@@ -639,7 +644,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+ if (cur == null || !cur.primary().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();
cur = new GridDistributedTxMapping(primary);
@@ -771,7 +776,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
private final GridNearOptimisticTxPrepareFuture parent;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Keys. */
@GridToStringInclude
@@ -787,19 +792,23 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
* @param parent Parent.
* @param m Mapping.
+ * @param futId Mini future ID.
* @param mappings Queue of mappings to proceed with.
*/
- MiniFuture(GridNearOptimisticTxPrepareFuture parent, GridDistributedTxMapping m,
+ MiniFuture(GridNearOptimisticTxPrepareFuture parent,
+ GridDistributedTxMapping m,
+ int futId,
Queue<GridDistributedTxMapping> mappings) {
this.parent = parent;
this.m = m;
+ this.futId = futId;
this.mappings = mappings;
}
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
@@ -807,7 +816,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @return Node ID.
*/
public ClusterNode node() {
- return m.node();
+ return m.primary();
}
/**
@@ -840,7 +849,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near optimistic prepare fut, mini future node left [txId=" + parent.tx.nearXidVersion() +
- ", node=" + m.node().id() + ']');
+ ", node=" + m.primary().id() + ']');
}
if (isDone())