You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:35:59 UTC
[30/50] ignite git commit: ignite-5578 Affinity for local join
ignite-5578 Affinity for local join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d47d9b78
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d47d9b78
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d47d9b78
Branch: refs/heads/ignite-5578
Commit: d47d9b785048c96b2b0ffd5b22046050a7c20c78
Parents: 0b9ba7a
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 12 10:30:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 12 10:30:44 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 5 +-
.../GridDhtPartitionsExchangeFuture.java | 64 ++++++++++++++++++--
2 files changed, 61 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d47d9b78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 6794c2b..45586c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1224,16 +1224,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param topVer Topology version.
* @param grpId Cache group ID.
* @return Affinity assignments.
*/
- public List<List<ClusterNode>> affinity(AffinityTopologyVersion topVer, Integer grpId) {
+ public GridAffinityAssignmentCache affinity(Integer grpId) {
CacheGroupHolder grpHolder = grpHolders.get(grpId);
assert grpHolder != null : grpId;
- return grpHolder.affinity().assignments(topVer);
+ return grpHolder.affinity();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d47d9b78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1039392..81b288c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -77,6 +78,7 @@ import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMess
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -1711,6 +1713,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ /**
+ * @param affReq Cache group IDs.
+ * @param cachesAff Optional already prepared affinity.
+ * @return Affinity.
+ */
private Map<Integer, CacheGroupAffinity> initCachesAffinity(Collection<Integer> affReq,
@Nullable Map<Integer, CacheGroupAffinity> cachesAff) {
assert !F.isEmpty(affReq);
@@ -1720,7 +1727,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (Integer grpId : affReq) {
if (!cachesAff.containsKey(grpId)) {
- List<List<ClusterNode>> assign = cctx.affinity().affinity(topologyVersion(), grpId);
+ List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topologyVersion());
cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign));
}
@@ -1743,8 +1750,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (!grp.isLocal()) {
if (localJoinExchange() && grp.affinity().lastVersion().topologyVersion() == -1L) {
List<List<ClusterNode>> aff = grp.affinity().calculate(topologyVersion(),
- discoEvt,
- discoCache);
+ discoEvt,
+ discoCache);
grp.affinity().initialize(topologyVersion(), aff);
}
@@ -1756,9 +1763,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, CacheGroupAffinity> cachesAff = null;
- for (GridDhtPartitionsSingleMessage msg : msgs.values()) {
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
+ GridDhtPartitionsSingleMessage msg = e.getValue();
+
+ // Apply update counters after all single messages are received.
for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
+
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
GridDhtPartitionTopology top = grp != null ? grp.topology() :
@@ -1772,8 +1783,41 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
- if (affReq != null)
+ if (affReq != null) {
cachesAff = initCachesAffinity(affReq, cachesAff);
+
+ UUID nodeId = e.getKey();
+
+ // If node requested affinity on join and partitions are not created, then
+ // all affinity partitions should be in MOVING state.
+ for (Integer grpId : affReq) {
+ GridDhtPartitionMap partMap = msg.partitions().get(grpId);
+
+ if (partMap == null || F.isEmpty(partMap.map())) {
+ if (partMap == null) {
+ partMap = new GridDhtPartitionMap(nodeId,
+ 1L,
+ topologyVersion(),
+ new GridPartitionStateMap(),
+ false);
+ }
+
+ AffinityAssignment aff = cctx.affinity().affinity(grpId).cachedAffinity(topologyVersion());
+
+ for (int p = 0; p < aff.assignment().size(); p++) {
+ if (aff.getIds(p).contains(nodeId))
+ partMap.put(p, GridDhtPartitionState.MOVING);
+ }
+
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+ GridDhtPartitionTopology top = grp != null ? grp.topology() :
+ cctx.exchange().clientTopology(grpId, this);
+
+ top.update(exchId, partMap);
+ }
+ }
+ }
}
if (discoEvt.type() == EVT_NODE_JOINED)
@@ -2039,6 +2083,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
grp.affinity().initialize(topologyVersion(), assignments0);
+ try {
+ grp.topology().initPartitions(this);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Interrupted when initialize local partitions.");
+
+ return;
+ }
+
cnt++;
}
}
@@ -2100,6 +2153,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* Updates partition map in all caches.
*
+ * @param node Node sent message.
* @param msg Partitions single message.
*/
private void updatePartitionSingleMap(ClusterNode node, GridDhtPartitionsSingleMessage msg) {