You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/10 13:55:26 UTC
[2/8] ignite git commit: ignite-4768
ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3d4a36b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3d4a36b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3d4a36b
Branch: refs/heads/ignite-4768-1
Commit: d3d4a36b4f0c71b5635dadd9211f73e728e29483
Parents: 784b171
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 9 16:38:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 9 16:38:54 2017 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 85 +++++++++++---------
.../dht/GridDhtTxPrepareRequest.java | 12 +--
.../dht/GridDhtTxPrepareResponse.java | 20 ++---
3 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d093b4a..735653d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -562,7 +562,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @return Mini future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private MiniFuture miniFuture(IgniteUuid miniId) {
+ private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (sync) {
int size = futuresCountNoLock();
@@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
MiniFuture mini = (MiniFuture)fut;
- if (mini.futureId().equals(miniId)) {
+ if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
@@ -1233,6 +1233,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return;
if (last) {
+ int miniId = 0;
+
assert tx.transactionNodes() != null;
final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
@@ -1257,7 +1259,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
+ MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
add(fut); // Append new future.
@@ -1371,7 +1373,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (tx.remainingTime() == -1)
return;
- MiniFuture fut = new MiniFuture(nearMapping.primary().id(), null, nearMapping);
+ MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
add(fut); // Append new future.
@@ -1481,25 +1483,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
try {
List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
+ assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
+
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
", entry=" + entry + ']');
- // Exclude local node.
- map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap);
+ for (int i = 1; i < dhtNodes.size(); i++) {
+ ClusterNode node = dhtNodes.get(i);
+
+ addMapping(entry, node, dhtMap);
+ }
Collection<UUID> readers = cached.readers();
if (!F.isEmpty(readers)) {
- Collection<ClusterNode> nearNodes =
- cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId())));
+ for (UUID readerId : readers) {
+ if (readerId.equals(tx.nearNodeId()))
+ continue;
- if (log.isDebugEnabled())
- log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) +
- ", entry=" + entry + ']');
+ ClusterNode readerNode = cctx.discovery().node(readerId);
+
+ if (readerNode == null || dhtNodes.contains(readerNode))
+ continue;
+
+ if (log.isDebugEnabled())
+ log.debug("Mapping entry to near node [node=" + readerNode + ", entry=" + entry + ']');
- // Exclude DHT nodes.
- map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap);
+ addMapping(entry, readerNode, nearMap);
+ }
}
else if (log.isDebugEnabled())
log.debug("Entry has no near readers: " + entry);
@@ -1516,39 +1528,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param entry Entry.
- * @param nodes Nodes.
+ * @param n Node.
* @param globalMap Map.
*/
- private void map(
+ private void addMapping(
IgniteTxEntry entry,
- Iterable<ClusterNode> nodes,
+ ClusterNode n,
Map<UUID, GridDistributedTxMapping> globalMap
) {
- if (nodes != null) {
- for (ClusterNode n : nodes) {
- GridDistributedTxMapping global = globalMap.get(n.id());
-
- if (!F.isEmpty(entry.entryProcessors())) {
- GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
- entry.cached().partition());
+ GridDistributedTxMapping global = globalMap.get(n.id());
- if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
- T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+ if (!F.isEmpty(entry.entryProcessors())) {
+ GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+ entry.cached().partition());
- assert procVal != null : entry;
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+ T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
- entry.op(procVal.get1());
- entry.value(procVal.get2(), true, false);
- entry.entryProcessors(null);
- }
- }
+ assert procVal != null : entry;
- if (global == null)
- globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
-
- global.add(entry);
+ entry.op(procVal.get1());
+ entry.value(procVal.get2(), true, false);
+ entry.entryProcessors(null);
}
}
+
+ if (global == null)
+ globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
+
+ global.add(entry);
}
/**
@@ -1602,7 +1610,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
+ private final int futId;
/** Node ID. */
private UUID nodeId;
@@ -1617,17 +1625,20 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param nodeId Node ID.
+ * @param futId Future ID.
* @param dhtMapping Mapping.
* @param nearMapping nearMapping.
*/
MiniFuture(
UUID nodeId,
+ int futId,
GridDistributedTxMapping dhtMapping,
GridDistributedTxMapping nearMapping
) {
assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
this.nodeId = nodeId;
+ this.futId = futId;
this.dhtMapping = dhtMapping;
this.nearMapping = nearMapping;
}
@@ -1635,7 +1646,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @return Future ID.
*/
- IgniteUuid futureId() {
+ int futureId() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 8c01302..8898803 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -59,7 +59,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -120,7 +120,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
*/
public GridDhtTxPrepareRequest(
IgniteUuid futId,
- IgniteUuid miniId,
+ int miniId,
AffinityTopologyVersion topVer,
GridDhtTxLocalAdapter tx,
long timeout,
@@ -145,7 +145,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.topVer = topVer;
this.futId = futId;
@@ -245,7 +245,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
@@ -361,7 +361,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
writer.incrementState();
case 22:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -453,7 +453,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 22:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index fdead95..416540a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -58,7 +58,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
private IgniteUuid futId;
/** Mini future ID. */
- private IgniteUuid miniId;
+ private int miniId;
/** Invalid partitions by cache ID. */
@GridDirectMap(keyType = Integer.class, valueType = int[].class)
@@ -81,11 +81,11 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
* @param miniId Mini future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) {
+ public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, boolean addDepInfo) {
super(xid, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
@@ -98,12 +98,12 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
* @param err Error.
* @param addDepInfo Deployment enabled.
*/
- public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err,
+ public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, Throwable err,
boolean addDepInfo) {
super(xid, err, addDepInfo);
assert futId != null;
- assert miniId != null;
+ assert miniId != 0;
this.futId = futId;
this.miniId = miniId;
@@ -112,7 +112,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Evicted readers.
*/
- public Collection<IgniteTxKey> nearEvicted() {
+ Collection<IgniteTxKey> nearEvicted() {
return nearEvicted;
}
@@ -133,14 +133,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Mini future ID.
*/
- public IgniteUuid miniId() {
+ public int miniId() {
return miniId;
}
/**
* @return Map from cacheId to an array of invalid partitions.
*/
- public Map<Integer, int[]> invalidPartitionsByCacheId() {
+ Map<Integer, int[]> invalidPartitionsByCacheId() {
return invalidParts;
}
@@ -250,7 +250,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
writer.incrementState();
case 10:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeInt("miniId", miniId))
return false;
writer.incrementState();
@@ -300,7 +300,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
case 10:
- miniId = reader.readIgniteUuid("miniId");
+ miniId = reader.readInt("miniId");
if (!reader.isLastRead())
return false;