You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/19 10:13:32 UTC

[3/6] ignite git commit: ignite-5578

ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8fb60ffc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8fb60ffc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8fb60ffc

Branch: refs/heads/ignite-5578
Commit: 8fb60ffcff592b7128ae88151f0fa07c016e0126
Parents: 056847c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 12:50:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 12:50:36 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 14 ++++
 .../discovery/GridDiscoveryManager.java         | 31 +++++++--
 .../cache/CacheAffinitySharedManager.java       | 46 ++++++++++++
 .../preloader/CacheGroupAffinityMessage.java    |  4 +-
 .../GridDhtPartitionsExchangeFuture.java        | 73 +++++---------------
 .../preloader/GridDhtPartitionsFullMessage.java | 20 ++++--
 6 files changed, 120 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index f63c5f6..1d8cfdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -85,7 +86,11 @@ public class DiscoCache {
     /** */
     private final IgniteProductVersion minNodeVer;
 
+    /** */
+    private final AffinityTopologyVersion topVer;
+
     /**
+     * @param topVer Topology version.
      * @param state Current cluster state.
      * @param loc Local node.
      * @param rmtNodes Remote nodes.
@@ -101,6 +106,7 @@ public class DiscoCache {
      * @param alives Alive nodes.
      */
     DiscoCache(
+        AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state,
         ClusterNode loc,
         List<ClusterNode> rmtNodes,
@@ -114,6 +120,7 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
         Set<UUID> alives) {
+        this.topVer = topVer;
         this.state = state;
         this.loc = loc;
         this.rmtNodes = rmtNodes;
@@ -143,6 +150,13 @@ public class DiscoCache {
     }
 
     /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion version() {
+        return topVer;
+    }
+
+    /**
      * @return Minimum node version.
      */
     public IgniteProductVersion minimumNodeVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 347f6fe..1e34f0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -620,7 +620,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 ChangeGlobalStateFinishMessage stateFinishMsg = null;
 
                 if (locJoinEvt) {
-                    discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                    discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer),
+                        ctx.state().clusterState(),
+                        locNode,
+                        topSnapshot);
 
                     transitionWaitFut = ctx.state().onLocalJoin(discoCache);
                 }
@@ -643,7 +646,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
                         ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
 
-                        discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                        discoCache = createDiscoCache(topSnap.get().topVer,
+                            ctx.state().clusterState(),
+                            locNode,
+                            topSnapshot);
 
                         topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
 
@@ -691,7 +697,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 // event notifications, since SPI notifies manager about all events from this listener.
                 if (verChanged) {
                     if (discoCache == null)
-                        discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                        discoCache = createDiscoCache(
+                            nextTopVer,
+                            ctx.state().clusterState(),
+                            locNode,
+                            topSnapshot);
 
                     discoCacheHist.put(nextTopVer, discoCache);
 
@@ -761,8 +771,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     topHist.clear();
 
-                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                        createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
+                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache(
+                        AffinityTopologyVersion.ZERO,
+                        ctx.state().clusterState(),
+                        locNode,
+                        Collections.<ClusterNode>emptySet())));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -2170,12 +2183,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * Called from discovery thread.
      *
+     * @param topVer Topology version.
      * @param state Current state.
      * @param loc Local node.
      * @param topSnapshot Topology snapshot.
      * @return Newly created discovery cache.
      */
-    @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state,
+    @NotNull private DiscoCache createDiscoCache(
+        AffinityTopologyVersion topVer,
+        DiscoveryDataClusterState state,
         ClusterNode loc,
         Collection<ClusterNode> topSnapshot) {
         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
@@ -2252,6 +2268,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         return new DiscoCache(
+            topVer,
             state,
             loc,
             Collections.unmodifiableList(rmtNodes),
@@ -2394,7 +2411,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         discoWrk.addEvent(EVT_NODE_SEGMENTED,
                             AffinityTopologyVersion.NONE,
                             node,
-                            createDiscoCache(null, node, empty),
+                            createDiscoCache(AffinityTopologyVersion.NONE, null, node, empty),
                             empty,
                             null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a59f5d7..bb27613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -51,8 +51,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -1246,6 +1248,50 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param fut Exchange future.
+     * @param msg Message.
+     */
+    public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) {
+        final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
+
+        if (F.isEmpty(affReq))
+            return;
+
+        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+        final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
+
+        assert !F.isEmpty(joinedNodeAff) : msg;
+        assert joinedNodeAff.size() >= affReq.size();
+
+        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+                if (affReq.contains(aff.groupId())) {
+                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+
+                    CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
+
+                    assert affMsg != null;
+
+                    List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache());
+
+                    // Calculate ideal assignments.
+                    if (!aff.centralizedAffinityFunction())
+                        aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+
+                    aff.initialize(fut.topologyVersion(), assignments);
+
+                    CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+
+                    assert grp != null;
+
+                    grp.topology().initPartitions(fut);
+                }
+            }
+        });
+    }
+
+    /**
      * Called on exchange initiated by server node join.
      *
      * @param fut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index ee4ef45..726054d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -94,7 +94,7 @@ public class CacheGroupAffinityMessage implements Message {
             if (!cachesAff.containsKey(grpId)) {
                 List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topVer);
 
-                cachesAff.put(grpId, new CacheGroupAffinityMessage(grpId, assign));
+                cachesAff.put(grpId, new CacheGroupAffinityMessage(assign));
             }
         }
 
@@ -199,7 +199,7 @@ public class CacheGroupAffinityMessage implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 2;
+        return 1;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6da7876..2c9119f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1189,10 +1189,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @param nodes Nodes.
-     * @param cachesAff Affinity if was requested by some nodes.
+     * @param joinedNodeAff Affinity if was requested by some nodes.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinityMessage> cachesAff)
+    private void sendAllPartitions(Collection<ClusterNode> nodes, Map<Integer, CacheGroupAffinityMessage> joinedNodeAff)
         throws IgniteCheckedException {
         boolean singleNode = nodes.size() == 1;
 
@@ -1212,15 +1212,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         for (ClusterNode node : nodes) {
             GridDhtPartitionsFullMessage sndMsg = msg;
 
-            if (cachesAff != null) {
+            if (joinedNodeAff != null) {
                 if (singleNode)
-                    msg.cachesAffinity(cachesAff);
+                    msg.joinedNodeAffinity(joinedNodeAff);
                 else {
                     GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
 
                     if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
-                        if (msgWithAff == null)
-                            msgWithAff = msg.copyWithAffinity(cachesAff);
+                        if (msgWithAff == null) {
+                            msgWithAff = msg.copy();
+
+                            msgWithAff.joinedNodeAffinity(joinedNodeAff);
+                        }
 
                         sndMsg = msgWithAff;
                     }
@@ -1747,7 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            Map<Integer, CacheGroupAffinityMessage> cachesAff = null;
+            Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
                 GridDhtPartitionsSingleMessage msg = e.getValue();
@@ -1770,10 +1773,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
                 if (affReq != null) {
-                    cachesAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
+                    joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
                         topologyVersion(),
                         affReq,
-                        cachesAff);
+                        joinedNodeAff);
 
                     UUID nodeId = e.getKey();
 
@@ -1880,7 +1883,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 if (!nodes.isEmpty())
-                    sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null);
+                    sendAllPartitions(nodes, joinedNodeAff);
 
                 onDone(exchangeId().topologyVersion(), err);
             }
@@ -1919,19 +1922,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (n != null) {
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
-                Collection<CacheGroupAffinityMessage> cachesAff = null;
+                Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
 
                 if (affReq != null) {
-                    Map<Integer, CacheGroupAffinityMessage> affMap = CacheGroupAffinityMessage.createAffinityMessages(
+                    joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(
                         cctx,
                         msg.exchangeId().topologyVersion(),
                         affReq,
                         null);
-
-                    cachesAff = affMap.values();
                 }
 
-                sendAllPartitions(F.asList(n), cachesAff);
+                sendAllPartitions(F.asList(n), joinedNodeAff);
             }
         }
         catch (IgniteCheckedException e) {
@@ -2055,46 +2056,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
 
-        if (localJoinExchange() && affReq != null) {
-            Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
-
-            Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity();
-
-            assert !F.isEmpty(cachesAff) : msg;
-            assert cachesAff.size() >= affReq.size();
-
-            int cnt = 0;
-
-            for (CacheGroupAffinityMessage aff : cachesAff) {
-                if (affReq.contains(aff.groupId())) {
-                    CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
-
-                    assert grp != null : aff.groupId();
-                    assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
-
-                    List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, discoCache);
-
-                    // Calculate ideal assignments.
-                    if (!grp.affinity().centralizedAffinityFunction())
-                        grp.affinity().calculate(topologyVersion(), discoEvt, discoCache);
-
-                    grp.affinity().initialize(topologyVersion(), assignments);
-
-                    try {
-                        grp.topology().initPartitions(this);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        U.warn(log, "Interrupted when initialize local partitions.");
-
-                        return;
-                    }
-
-                    cnt++;
-                }
-            }
-
-            assert affReq.size() == cnt : cnt;
-        }
+        if (localJoinExchange() && affReq != null)
+            cctx.affinity().onLocalJoin(this, msg);
 
         updatePartitionFullMap(msg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 1ea8757..a4258c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -456,15 +456,20 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
+            case 5:
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
 
             case 6:
-                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+                if (!writer.writeByteArray("errsBytes", errsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("errsBytes", errsBytes))
+                if (!writer.writeMap("joinedNodeAff", joinedNodeAff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -515,9 +520,16 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
+            case 5:
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
 
             case 6:
-                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+                errsBytes = reader.readByteArray("errsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -525,7 +537,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 7:
-                errsBytes = reader.readByteArray("errsBytes");
+                joinedNodeAff = reader.readMap("joinedNodeAff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
 
                 if (!reader.isLastRead())
                     return false;