You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 08:00:32 UTC

[13/38] ignite git commit: ignite-4768

ignite-4768


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3d4a36b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3d4a36b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3d4a36b

Branch: refs/heads/ignite-4768
Commit: d3d4a36b4f0c71b5635dadd9211f73e728e29483
Parents: 784b171
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 9 16:38:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 9 16:38:54 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java | 85 +++++++++++---------
 .../dht/GridDhtTxPrepareRequest.java            | 12 +--
 .../dht/GridDhtTxPrepareResponse.java           | 20 ++---
 3 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d093b4a..735653d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -562,7 +562,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @return Mini future.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private MiniFuture miniFuture(IgniteUuid miniId) {
+    private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
         synchronized (sync) {
             int size = futuresCountNoLock();
@@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                 MiniFuture mini = (MiniFuture)fut;
 
-                if (mini.futureId().equals(miniId)) {
+                if (mini.futureId() == miniId) {
                     if (!mini.isDone())
                         return mini;
                     else
@@ -1233,6 +1233,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 return;
 
             if (last) {
+                int miniId = 0;
+
                 assert tx.transactionNodes() != null;
 
                 final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
@@ -1257,7 +1259,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     if (tx.remainingTime() == -1)
                         return;
 
-                    MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping);
+                    MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
 
                     add(fut); // Append new future.
 
@@ -1371,7 +1373,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         if (tx.remainingTime() == -1)
                             return;
 
-                        MiniFuture fut = new MiniFuture(nearMapping.primary().id(), null, nearMapping);
+                        MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
 
                         add(fut); // Append new future.
 
@@ -1481,25 +1483,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             try {
                 List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
 
+                assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
+
                 if (log.isDebugEnabled())
                     log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
                         ", entry=" + entry + ']');
 
-                // Exclude local node.
-                map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap);
+                for (int i = 1; i < dhtNodes.size(); i++) {
+                    ClusterNode node = dhtNodes.get(i);
+
+                    addMapping(entry, node, dhtMap);
+                }
 
                 Collection<UUID> readers = cached.readers();
 
                 if (!F.isEmpty(readers)) {
-                    Collection<ClusterNode> nearNodes =
-                        cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId())));
+                    for (UUID readerId : readers) {
+                        if (readerId.equals(tx.nearNodeId()))
+                            continue;
 
-                    if (log.isDebugEnabled())
-                        log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) +
-                            ", entry=" + entry + ']');
+                        ClusterNode readerNode = cctx.discovery().node(readerId);
+
+                        if (readerNode == null || dhtNodes.contains(readerNode))
+                            continue;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Mapping entry to near node [node=" + readerNode + ", entry=" + entry + ']');
 
-                    // Exclude DHT nodes.
-                    map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap);
+                        addMapping(entry, readerNode, nearMap);
+                    }
                 }
                 else if (log.isDebugEnabled())
                     log.debug("Entry has no near readers: " + entry);
@@ -1516,39 +1528,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /**
      * @param entry Entry.
-     * @param nodes Nodes.
+     * @param n Node.
      * @param globalMap Map.
      */
-    private void map(
+    private void addMapping(
         IgniteTxEntry entry,
-        Iterable<ClusterNode> nodes,
+        ClusterNode n,
         Map<UUID, GridDistributedTxMapping> globalMap
     ) {
-        if (nodes != null) {
-            for (ClusterNode n : nodes) {
-                GridDistributedTxMapping global = globalMap.get(n.id());
-
-                if (!F.isEmpty(entry.entryProcessors())) {
-                    GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
-                        entry.cached().partition());
+        GridDistributedTxMapping global = globalMap.get(n.id());
 
-                    if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
-                        T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+        if (!F.isEmpty(entry.entryProcessors())) {
+            GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+                entry.cached().partition());
 
-                        assert procVal != null : entry;
+            if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+                T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
 
-                        entry.op(procVal.get1());
-                        entry.value(procVal.get2(), true, false);
-                        entry.entryProcessors(null);
-                    }
-                }
+                assert procVal != null : entry;
 
-                if (global == null)
-                    globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
-
-                global.add(entry);
+                entry.op(procVal.get1());
+                entry.value(procVal.get2(), true, false);
+                entry.entryProcessors(null);
             }
         }
+
+        if (global == null)
+            globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
+
+        global.add(entry);
     }
 
     /**
@@ -1602,7 +1610,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         private static final long serialVersionUID = 0L;
 
         /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
+        private final int futId;
 
         /** Node ID. */
         private UUID nodeId;
@@ -1617,17 +1625,20 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
         /**
          * @param nodeId Node ID.
+         * @param futId Future ID.
          * @param dhtMapping Mapping.
          * @param nearMapping nearMapping.
          */
         MiniFuture(
             UUID nodeId,
+            int futId,
             GridDistributedTxMapping dhtMapping,
             GridDistributedTxMapping nearMapping
         ) {
             assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary());
 
             this.nodeId = nodeId;
+            this.futId = futId;
             this.dhtMapping = dhtMapping;
             this.nearMapping = nearMapping;
         }
@@ -1635,7 +1646,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         /**
          * @return Future ID.
          */
-        IgniteUuid futureId() {
+        int futureId() {
             return futId;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 8c01302..8898803 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -59,7 +59,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     private IgniteUuid futId;
 
     /** Mini future ID. */
-    private IgniteUuid miniId;
+    private int miniId;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
@@ -120,7 +120,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
      */
     public GridDhtTxPrepareRequest(
         IgniteUuid futId,
-        IgniteUuid miniId,
+        int miniId,
         AffinityTopologyVersion topVer,
         GridDhtTxLocalAdapter tx,
         long timeout,
@@ -145,7 +145,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
             addDepInfo);
 
         assert futId != null;
-        assert miniId != null;
+        assert miniId != 0;
 
         this.topVer = topVer;
         this.futId = futId;
@@ -245,7 +245,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /**
      * @return Mini future ID.
      */
-    public IgniteUuid miniId() {
+    public int miniId() {
         return miniId;
     }
 
@@ -361,7 +361,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
@@ -453,7 +453,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 22:
-                miniId = reader.readIgniteUuid("miniId");
+                miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3d4a36b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index fdead95..416540a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -58,7 +58,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     private IgniteUuid futId;
 
     /** Mini future ID. */
-    private IgniteUuid miniId;
+    private int miniId;
 
     /** Invalid partitions by cache ID. */
     @GridDirectMap(keyType = Integer.class, valueType = int[].class)
@@ -81,11 +81,11 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
      * @param miniId Mini future ID.
      * @param addDepInfo Deployment info flag.
      */
-    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) {
+    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, boolean addDepInfo) {
         super(xid, addDepInfo);
 
         assert futId != null;
-        assert miniId != null;
+        assert miniId != 0;
 
         this.futId = futId;
         this.miniId = miniId;
@@ -98,12 +98,12 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
      * @param err Error.
      * @param addDepInfo Deployment enabled.
      */
-    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err,
+    public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, int miniId, Throwable err,
         boolean addDepInfo) {
         super(xid, err, addDepInfo);
 
         assert futId != null;
-        assert miniId != null;
+        assert miniId != 0;
 
         this.futId = futId;
         this.miniId = miniId;
@@ -112,7 +112,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /**
      * @return Evicted readers.
      */
-    public Collection<IgniteTxKey> nearEvicted() {
+    Collection<IgniteTxKey> nearEvicted() {
         return nearEvicted;
     }
 
@@ -133,14 +133,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /**
      * @return Mini future ID.
      */
-    public IgniteUuid miniId() {
+    public int miniId() {
         return miniId;
     }
 
     /**
      * @return Map from cacheId to an array of invalid partitions.
      */
-    public Map<Integer, int[]> invalidPartitionsByCacheId() {
+    Map<Integer, int[]> invalidPartitionsByCacheId() {
         return invalidParts;
     }
 
@@ -250,7 +250,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
@@ -300,7 +300,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 reader.incrementState();
 
             case 10:
-                miniId = reader.readIgniteUuid("miniId");
+                miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
                     return false;