You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:36:01 UTC
[32/50] ignite git commit: ignite-5578 Affinity for local join
ignite-5578 Affinity for local join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c712412
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c712412
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c712412
Branch: refs/heads/ignite-5578
Commit: 0c7124122e8eebaad1a85f844277a5cf1564a8de
Parents: 62e3e70
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 12 11:32:41 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 12 15:49:44 2017 +0300
----------------------------------------------------------------------
.../affinity/GridAffinityAssignmentCache.java | 8 +
.../cache/CacheAffinitySharedManager.java | 105 ++++++----
.../GridCachePartitionExchangeManager.java | 19 +-
.../GridDhtPartitionsExchangeFuture.java | 191 +++++++++++++------
4 files changed, 233 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a8c6c59..a8ac825 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -337,6 +337,14 @@ public class GridAffinityAssignmentCache {
}
/**
+ * @param topVer
+ * @return
+ */
+ public boolean lastVersionEquals(AffinityTopologyVersion topVer) {
+ return topVer.equals(lastVersion());
+ }
+
+ /**
* @return Last calculated affinity version.
*/
public AffinityTopologyVersion lastVersion() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 45586c7..3f24547 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1230,7 +1230,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
public GridAffinityAssignmentCache affinity(Integer grpId) {
CacheGroupHolder grpHolder = grpHolders.get(grpId);
- assert grpHolder != null : grpId;
+ assert grpHolder != null : debugGroupName(grpId);
return grpHolder.affinity();
}
@@ -1311,6 +1311,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param grpId Group ID.
+ * @return Group name for debug purpose.
+ */
+ private String debugGroupName(int grpId) {
+ CacheGroupDescriptor desc = caches.group(grpId);
+
+ if (desc != null)
+ return desc.cacheOrGroupName();
+ else
+ return "Unknown group: " + grpId;
+ }
+
+ /**
* @param fut Exchange future.
* @throws IgniteCheckedException If failed.
*/
@@ -1450,8 +1463,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @throws IgniteCheckedException If failed.
* @return Future completed when caches initialization is done.
*/
- private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut)
- throws IgniteCheckedException {
+ public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut,
+ final boolean newAff) throws IgniteCheckedException {
final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@@ -1483,51 +1496,75 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
final GridAffinityAssignmentCache aff = grpHolder.affinity();
- List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
+ if (newAff) {
+ if (!aff.lastVersionEquals(fut.topologyVersion())) {
+ List<List<ClusterNode>> assign =
+ aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
- int idx = exchFuts.indexOf(fut);
+ aff.initialize(fut.topologyVersion(), assign);
+ }
+ }
+ else {
+ List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
- assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx +
- ", total=" + exchFuts.size() + ']';
+ int idx = exchFuts.indexOf(fut);
- final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
+ assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx +
+ ", total=" + exchFuts.size() + ']';
- if (log.isDebugEnabled()) {
- log.debug("Need initialize affinity on coordinator [" +
- "cacheGrp=" + desc.cacheOrGroupName() +
- "prevAff=" + prev.topologyVersion() + ']');
- }
+ final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
- assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
+ if (log.isDebugEnabled()) {
+ log.debug("Need initialize affinity on coordinator [" +
+ "cacheGrp=" + desc.cacheOrGroupName() +
+ "prevAff=" + prev.topologyVersion() + ']');
+ }
- GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
- desc.groupId(),
- prev.topologyVersion(),
- prev.discoCache());
+ assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
- fetchFut.init(false);
+ GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+ desc.groupId(),
+ prev.topologyVersion(),
+ prev.discoCache());
- final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
+ fetchFut.init(false);
- fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
- @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
- throws IgniteCheckedException {
- fetchAffinity(prev.topologyVersion(),
- prev.discoveryEvent(),
- prev.discoCache(),
- aff, (GridDhtAssignmentFetchFuture)fetchFut);
+ final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
- aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
+ @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
+ throws IgniteCheckedException {
+ fetchAffinity(prev.topologyVersion(),
+ prev.discoveryEvent(),
+ prev.discoCache(),
+ aff,
+ (GridDhtAssignmentFetchFuture)fetchFut);
- affFut.onDone(fut.topologyVersion());
- }
- });
+ aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+
+ affFut.onDone(fut.topologyVersion());
+ }
+ });
- futs.add(affFut);
+ futs.add(affFut);
+ }
}
- else
+ else {
grpHolder = new CacheGroupHolder1(grp, null);
+ if (newAff) {
+ GridAffinityAssignmentCache aff = grpHolder.affinity();
+
+ if (!aff.lastVersionEquals(fut.topologyVersion())) {
+ List<List<ClusterNode>> assign = aff.calculate(fut.topologyVersion(),
+ fut.discoveryEvent(),
+ fut.discoCache());
+
+ aff.initialize(fut.topologyVersion(), assign);
+ }
+ }
+ }
+
CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder);
assert old == null : old;
@@ -1757,7 +1794,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
*/
public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinityOnNodeLeft(
final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
- IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut);
+ IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut, false);
if (initFut != null && !initFut.isDone()) {
final GridFutureAdapter<Map<Integer, Map<Integer, List<UUID>>>> resFut = new GridFutureAdapter<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 018537c..51214e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1438,7 +1438,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
try {
- sendLocalPartitions(node, msg.exchangeId());
+ List<GridDhtPartitionsExchangeFuture> futs = exchangeFutures();
+
+ GridDhtPartitionsExchangeFuture fut = null;
+
+ for (int i = futs.size() - 1; i >= 0; i++) {
+ GridDhtPartitionsExchangeFuture fut0 = futs.get(i);
+
+ if (fut0.exchangeId().equals(msg.exchangeId())) {
+ fut = fut0;
+
+ break;
+ }
+ }
+
+ if (fut != null)
+ fut.processSinglePartitionRequest(node, msg);
+ else
+ sendLocalPartitions(node, msg.exchangeId());
}
finally {
leaveBusy();
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 81b288c..4a39bae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -136,6 +136,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private ClusterNode crd;
+ /** */
+ private boolean crdReady;
+
/** ExchangeFuture id. */
private final GridDhtPartitionExchangeId exchId;
@@ -169,7 +172,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* Messages received on non-coordinator are stored in case if this node
* becomes coordinator.
*/
- private final Map<ClusterNode, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>();
+ private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>();
/** Messages received from new coordinator. */
private final Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>();
@@ -224,6 +227,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@GridToStringExclude
private ExchangeContext exchCtx;
+ /** */
+ private FinishState finishState;
+
/**
* @param cctx Cache context.
* @param busyLock Busy lock.
@@ -452,6 +458,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
boolean crdNode = crd != null && crd.isLocal();
+ if (crdNode)
+ crdReady = true;
+
exchLog.info("Started exchange init [topVer=" + topVer +
", crd=" + crdNode +
", evt=" + discoEvt.type() +
@@ -1434,44 +1443,44 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
}
- processMessage(node, msg);
+ processMessage(node.id(), msg);
}
});
}
}
/**
- * @param node Sender node.
+ * @param nodeId Sender node.
* @param msg Message.
*/
- private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+ private void processMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
boolean allReceived = false;
boolean updateSingleMap = false;
synchronized (this) {
assert crd != null;
- if (crd.isLocal()) {
- if (remaining.remove(node.id())) {
+ if (crd.isLocal() && crdReady) {
+ if (remaining.remove(nodeId)) {
updateSingleMap = true;
pendingSingleUpdates++;
if (stateChangeExchange() && msg.getError() != null)
- changeGlobalStateExceptions.put(node.id(), msg.getError());
+ changeGlobalStateExceptions.put(nodeId, msg.getError());
allReceived = remaining.isEmpty();
}
}
else
- singleMsgs.put(node, msg);
+ singleMsgs.put(nodeId, msg);
}
if (updateSingleMap) {
try {
// Do not update partition map, in case cluster transitioning to inactive state.
if (!deactivateCluster())
- updatePartitionSingleMap(node, msg);
+ updatePartitionSingleMap(nodeId, msg);
}
finally {
synchronized (this) {
@@ -1747,17 +1756,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (!crd.equals(discoCache.serverNodes().get(0))) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (!grp.isLocal()) {
- if (localJoinExchange() && grp.affinity().lastVersion().topologyVersion() == -1L) {
- List<List<ClusterNode>> aff = grp.affinity().calculate(topologyVersion(),
- discoEvt,
- discoCache);
-
- grp.affinity().initialize(topologyVersion(), aff);
- }
-
+ if (!grp.isLocal())
grp.topology().beforeExchange(this, !centralizedAff);
- }
}
}
@@ -2013,12 +2013,37 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param node Sender node.
* @param msg Message.
*/
+ public void processSinglePartitionRequest(final ClusterNode node, GridDhtPartitionsSingleRequest msg) {
+ if (!cctx.discovery().alive(node.id()))
+ return;
+
+ initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ synchronized (this) {
+ if (finishState != null && node.id().equals(finishState.crdId))
+ return;
+ }
+
+ try {
+ sendLocalPartitions(node);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message to coordinator: " + e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @param node Sender node.
+ * @param msg Message.
+ */
private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
assert exchId.equals(msg.exchangeId()) : msg;
assert msg.lastVersion() != null : msg;
synchronized (this) {
- if (crd == null)
+ if (crd == null || finishState != null)
return;
if (!crd.equals(node)) {
@@ -2031,6 +2056,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
}
+
+ finishState = new FinishState(crd.id());
}
Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
@@ -2040,7 +2067,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Collection<CacheGroupAffinity> cachesAff = msg.cachesAffinity();
- assert !F.isEmpty(cachesAff) : cachesAff;
+ assert !F.isEmpty(cachesAff) : msg;
assert cachesAff.size() >= affReq.size();
int cnt = 0;
@@ -2153,11 +2180,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* Updates partition map in all caches.
*
- * @param node Node sent message.
+ * @param nodeId Node message received from.
* @param msg Partitions single message.
*/
- private void updatePartitionSingleMap(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
- msgs.put(node.id(), msg);
+ private void updatePartitionSingleMap(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+ msgs.put(nodeId, msg);
for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
@@ -2282,7 +2309,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
boolean crdChanged = false;
boolean allReceived = false;
- Set<UUID> reqFrom = null;
+ Set<UUID> remaining0 = null;
ClusterNode crd0;
@@ -2301,11 +2328,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (crd != null && crd.isLocal()) {
- if (rmvd)
+ if (crdChanged)
+ remaining0 = new HashSet<>(remaining);
+ else if (crdReady && rmvd)
allReceived = remaining.isEmpty();
-
- if (crdChanged && !remaining.isEmpty())
- reqFrom = new HashSet<>(remaining);
}
crd0 = crd;
@@ -2334,35 +2360,30 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (stateChangeExchange() && changeGlobalStateE != null)
changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE);
- if (allReceived) {
- awaitSingleMapUpdates();
-
- onAllReceived();
+ if (crdChanged) {
+ boolean newAff = localJoinExchange();
+
+ IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(
+ GridDhtPartitionsExchangeFuture.this, newAff);
+
+ if (fut == null || fut.isDone())
+ onBecomeCoordinator();
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ onBecomeCoordinator();
+ }
+ });
+ }
return;
}
- if (crdChanged && reqFrom != null) {
- GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId);
-
- for (UUID nodeId : reqFrom) {
- try {
- // It is possible that some nodes finished exchange with previous coordinator.
- cctx.io().send(nodeId, req, SYSTEM_POOL);
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Node left during partition exchange [nodeId=" + nodeId +
- ", exchId=" + exchId + ']');
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to request partitions from node: " + nodeId, e);
- }
- }
- }
+ if (allReceived) {
+ awaitSingleMapUpdates();
- for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
- processMessage(m.getKey(), m.getValue());
+ onAllReceived();
+ }
}
else {
if (crdChanged) {
@@ -2373,11 +2394,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
}
- catch (Exception e) {
+ catch (IgniteCheckedException e) {
if (reconnectOnError(e))
onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
else
- throw e;
+ U.error(log, "Failed to process node left event: " + e, e);
}
finally {
leaveBusy();
@@ -2391,6 +2412,51 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ *
+ */
+ private void onBecomeCoordinator() {
+ Set<UUID> remaining0 = null;
+
+ synchronized (this) {
+ assert crd != null && crd.isLocal();
+ assert !crdReady;
+
+ crdReady = true;
+
+ if (!remaining.isEmpty())
+ remaining0 = new HashSet<>(remaining);
+ }
+
+ if (remaining0 != null) {
+ // It is possible that some nodes finished exchange with previous coordinator.
+ GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId);
+
+ for (UUID nodeId : remaining0) {
+ try {
+ if (!singleMsgs.containsKey(nodeId))
+ cctx.io().send(nodeId, req, SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during partition exchange [nodeId=" + nodeId +
+ ", exchId=" + exchId + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to request partitions from node: " + nodeId, e);
+ }
+ }
+
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
+ processMessage(m.getKey(), m.getValue());
+ }
+ else {
+ awaitSingleMapUpdates();
+
+ onAllReceived();
+ }
+ }
+
+ /**
* @param e Exception.
* @return {@code True} if local node should try reconnect in case of error.
*/
@@ -2529,4 +2595,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return nextTimeout <= limit ? nextTimeout : limit;
}
+
+ /**
+ *
+ */
+ private static class FinishState {
+ /** */
+ private final UUID crdId;
+
+ /**
+ * @param crdId Coordinator node.
+ */
+ FinishState(UUID crdId) {
+ this.crdId = crdId;
+ }
+ }
}