You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:35:59 UTC

[30/50] ignite git commit: ignite-5578 Affinity for local join

ignite-5578 Affinity for local join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d47d9b78
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d47d9b78
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d47d9b78

Branch: refs/heads/ignite-5578
Commit: d47d9b785048c96b2b0ffd5b22046050a7c20c78
Parents: 0b9ba7a
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 12 10:30:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 12 10:30:44 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  5 +-
 .../GridDhtPartitionsExchangeFuture.java        | 64 ++++++++++++++++++--
 2 files changed, 61 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d47d9b78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 6794c2b..45586c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1224,16 +1224,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param topVer Topology version.
      * @param grpId Cache group ID.
      * @return Affinity assignments.
      */
-    public List<List<ClusterNode>> affinity(AffinityTopologyVersion topVer, Integer grpId) {
+    public GridAffinityAssignmentCache affinity(Integer grpId) {
         CacheGroupHolder grpHolder = grpHolders.get(grpId);
 
         assert grpHolder != null : grpId;
 
-        return grpHolder.affinity().assignments(topVer);
+        return grpHolder.affinity();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d47d9b78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1039392..81b288c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -77,6 +78,7 @@ import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMess
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -1711,6 +1713,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
     }
 
+    /**
+     * @param affReq Cache group IDs.
+     * @param cachesAff Optional already prepared affinity.
+     * @return Affinity.
+     */
     private Map<Integer, CacheGroupAffinity> initCachesAffinity(Collection<Integer> affReq,
        @Nullable Map<Integer, CacheGroupAffinity> cachesAff) {
        assert !F.isEmpty(affReq);
@@ -1720,7 +1727,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         for (Integer grpId : affReq) {
             if (!cachesAff.containsKey(grpId)) {
-                List<List<ClusterNode>> assign = cctx.affinity().affinity(topologyVersion(), grpId);
+                List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topologyVersion());
 
                 cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign));
             }
@@ -1743,8 +1750,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     if (!grp.isLocal()) {
                         if (localJoinExchange() && grp.affinity().lastVersion().topologyVersion() == -1L) {
                             List<List<ClusterNode>> aff = grp.affinity().calculate(topologyVersion(),
-                                    discoEvt,
-                                    discoCache);
+                                discoEvt,
+                                discoCache);
 
                             grp.affinity().initialize(topologyVersion(), aff);
                         }
@@ -1756,9 +1763,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             Map<Integer, CacheGroupAffinity> cachesAff = null;
 
-            for (GridDhtPartitionsSingleMessage msg : msgs.values()) {
+            for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
+                GridDhtPartitionsSingleMessage msg = e.getValue();
+
+                // Apply update counters after all single messages are received.
                 for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
                     Integer grpId = entry.getKey();
+
                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
                     GridDhtPartitionTopology top = grp != null ? grp.topology() :
@@ -1772,8 +1783,41 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
-                if (affReq != null)
+                if (affReq != null) {
                     cachesAff = initCachesAffinity(affReq, cachesAff);
+
+                    UUID nodeId = e.getKey();
+
+                    // If node requested affinity on join and partitions are not created, then
+                    // all affinity partitions should be in MOVING state.
+                    for (Integer grpId : affReq) {
+                        GridDhtPartitionMap partMap = msg.partitions().get(grpId);
+
+                        if (partMap == null || F.isEmpty(partMap.map())) {
+                            if (partMap == null) {
+                                partMap = new GridDhtPartitionMap(nodeId,
+                                    1L,
+                                    topologyVersion(),
+                                    new GridPartitionStateMap(),
+                                    false);
+                            }
+
+                            AffinityAssignment aff = cctx.affinity().affinity(grpId).cachedAffinity(topologyVersion());
+
+                            for (int p = 0; p < aff.assignment().size(); p++) {
+                                if (aff.getIds(p).contains(nodeId))
+                                    partMap.put(p, GridDhtPartitionState.MOVING);
+                            }
+
+                            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+                            GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                                cctx.exchange().clientTopology(grpId, this);
+
+                            top.update(exchId, partMap);
+                        }
+                    }
+                }
             }
 
             if (discoEvt.type() == EVT_NODE_JOINED)
@@ -2039,6 +2083,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                     grp.affinity().initialize(topologyVersion(), assignments0);
 
+                    try {
+                        grp.topology().initPartitions(this);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        U.warn(log, "Interrupted when initialize local partitions.");
+
+                        return;
+                    }
+
                     cnt++;
                 }
             }
@@ -2100,6 +2153,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * Updates partition map in all caches.
      *
+     * @param node Node sent message.
      * @param msg Partitions single message.
      */
     private void updatePartitionSingleMap(ClusterNode node, GridDhtPartitionsSingleMessage msg) {