You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:36:19 UTC
[50/50] ignite git commit: ignite-5578
ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9ef68e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9ef68e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9ef68e3
Branch: refs/heads/ignite-5578
Commit: c9ef68e3f21aeec60ee2c8605f849b581a3d6ce4
Parents: 39cccec
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 13 14:59:35 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 13 17:35:17 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 138 ++++++------
.../processors/cache/ExchangeContext.java | 50 ++---
.../cache/ExchangeDiscoveryEvents.java | 52 ++++-
.../GridCachePartitionExchangeManager.java | 55 ++---
.../GridDhtPartitionsExchangeFuture.java | 219 +++++++++++--------
.../preloader/GridDhtPartitionsFullMessage.java | 4 +
.../CacheExchangeCoalescingTest.java | 73 -------
.../distributed/CacheExchangeMergeTest.java | 73 +++++++
8 files changed, 361 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a31ab1c..4ea61a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -1232,38 +1233,45 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param crd Coordinator flag.
- * @throws IgniteCheckedException If failed.
+ * @param grpId Cache group ID.
+ * @return Affinity assignments.
*/
- public void onLocalJoin(boolean crd) throws IgniteCheckedException {
+ public GridAffinityAssignmentCache affinity(Integer grpId) {
+ CacheGroupHolder grpHolder = grpHolders.get(grpId);
+
+ assert grpHolder != null : debugGroupName(grpId);
+ return grpHolder.affinity();
}
- public void processDiscoveryEvents(ExchangeDiscoveryEvents evts) {
- AffinityTopologyVersion topVer = evts.topologyVersion();
+ public Map<Integer, Map<Integer, List<UUID>>> onTopologyChange(GridDhtPartitionsExchangeFuture fut,
+ boolean crd)
+ throws IgniteCheckedException
+ {
+ final ExchangeDiscoveryEvents evts = fut.context().events();
+
+ assert evts.serverJoin() || evts.serverLeft();
if (evts.serverLeft()) {
+ forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+ @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = evts.topologyVersion();
- }
- else if (evts.serverJoin()) {
+ CacheGroupHolder cache = groupHolder(topVer, desc);
- }
- else {
+ cache.affinity().calculate(topVer, evts.event(), evts.discoveryCache());
+ }
+ });
+ return initAffinityOnNodeLeft0(evts.topologyVersion(), fut);
}
- }
-
-
- /**
- * @param grpId Cache group ID.
- * @return Affinity assignments.
- */
- public GridAffinityAssignmentCache affinity(Integer grpId) {
- CacheGroupHolder grpHolder = grpHolders.get(grpId);
+ else {
+ WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, crd);
- assert grpHolder != null : debugGroupName(grpId);
+ setWaitRebalanceInfo(waitRebalanceInfo, evts.topologyVersion(), crd);
- return grpHolder.affinity();
+ return null;
+ }
}
/**
@@ -1299,25 +1307,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
else
fetchAffinityOnJoin(fut);
}
- else {
- waitRebalanceInfo = initAffinityOnNodeJoin(fut.topologyVersion(),
- fut.discoveryEvent(),
- fut.discoCache(),
- crd);
- }
+ else
+ waitRebalanceInfo = initAffinityOnNodeJoin(fut.context().events(), crd);
- synchronized (mux) {
- affCalcVer = fut.topologyVersion();
+ setWaitRebalanceInfo(waitRebalanceInfo, fut.topologyVersion(), crd);
+ }
- this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
+ private void setWaitRebalanceInfo(WaitRebalanceInfo waitRebalanceInfo, AffinityTopologyVersion topVer, boolean crd) {
+ affCalcVer = topVer;
- WaitRebalanceInfo info = this.waitInfo;
+ this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
- if (crd) {
- if (log.isDebugEnabled()) {
- log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() +
- ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
- }
+ WaitRebalanceInfo info = this.waitInfo;
+
+ if (crd) {
+ if (log.isDebugEnabled()) {
+ log.debug("Computed new affinity after node join [topVer=" + topVer +
+ ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
}
}
}
@@ -1654,17 +1660,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param topVer Topology version.
- * @param evt Discovery event.
- * @param discoCache Discovery data cache.
* @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
* @return Rabalance info.
*/
- @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final AffinityTopologyVersion topVer,
- final DiscoveryEvent evt,
- final DiscoCache discoCache,
- boolean crd)
+ @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final ExchangeDiscoveryEvents evts, boolean crd)
throws IgniteCheckedException {
final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
@@ -1675,23 +1675,27 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
boolean latePrimary = grp.rebalanceEnabled();
- initAffinityOnNodeJoin(topVer, evt, discoCache, grp.affinity(), null, latePrimary, affCache);
+ initAffinityOnNodeJoin(evts,
+ evts.groupAddedOnExchange(grp.groupId(), grp.receivedFrom()),
+ grp.affinity(),
+ null,
+ latePrimary,
+ affCache);
}
return null;
}
else {
- final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
+ final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(evts.topologyVersion());
forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
- CacheGroupHolder cache = groupHolder(topVer, desc);
+ CacheGroupHolder cache = groupHolder(evts.topologyVersion(), desc);
boolean latePrimary = cache.rebalanceEnabled;
- initAffinityOnNodeJoin(topVer,
- evt,
- discoCache,
+ initAffinityOnNodeJoin(evts,
+ evts.groupAddedOnExchange(desc.groupId(), desc.receivedFrom()),
cache.affinity(),
waitRebalanceInfo,
latePrimary,
@@ -1704,24 +1708,33 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param topVer Topology version.
- * @param evt Discovery event.
- * @param discoCache Discovery data cache.
* @param aff Affinity.
* @param rebalanceInfo Rebalance information.
* @param latePrimary If {@code true} delays primary assignment if it is not owner.
* @param affCache Already calculated assignments (to reduce data stored in history).
* @throws IgniteCheckedException If failed.
*/
- private void initAffinityOnNodeJoin(AffinityTopologyVersion topVer,
- DiscoveryEvent evt,
- DiscoCache discoCache,
+ private void initAffinityOnNodeJoin(
+ ExchangeDiscoveryEvents evts,
+ boolean addedOnExchnage,
GridAffinityAssignmentCache aff,
WaitRebalanceInfo rebalanceInfo,
boolean latePrimary,
Map<Object, List<List<ClusterNode>>> affCache)
throws IgniteCheckedException
{
+ if (addedOnExchnage) {
+ if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) {
+ List<List<ClusterNode>> newAff = aff.calculate(evts.topologyVersion(),
+ evts.event(),
+ evts.discoveryCache());
+
+ aff.initialize(evts.topologyVersion(), newAff);
+ }
+
+ return;
+ }
+
AffinityTopologyVersion affTopVer = aff.lastVersion();
assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() +
@@ -1731,7 +1744,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert aff.idealAssignment() != null : "Previous assignment is not available.";
- List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, evt, discoCache);
+ List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts.event(), evts.discoveryCache());
List<List<ClusterNode>> newAssignment = null;
if (latePrimary) {
@@ -1743,7 +1756,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
ClusterNode newPrimary = newNodes.size() > 0 ? newNodes.get(0) : null;
if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) {
- assert cctx.discovery().node(topVer, curPrimary.id()) != null : curPrimary;
+ assert cctx.discovery().node(evts.topologyVersion(), curPrimary.id()) != null : curPrimary;
List<ClusterNode> nodes0 = latePrimaryAssignment(aff,
p,
@@ -1762,7 +1775,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (newAssignment == null)
newAssignment = idealAssignment;
- aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
+ aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
}
/**
@@ -1834,7 +1847,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> initFut) {
try {
- resFut.onDone(initAffinityOnNodeLeft0(fut));
+ resFut.onDone(initAffinityOnNodeLeft0(fut.topologyVersion(), fut));
}
catch (IgniteCheckedException e) {
resFut.onDone(e);
@@ -1845,7 +1858,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
return resFut;
}
else
- return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut));
+ return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut.topologyVersion(), fut));
}
/**
@@ -1853,10 +1866,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @return Affinity assignment.
* @throws IgniteCheckedException If failed.
*/
- private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
+ private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final AffinityTopologyVersion topVer,
+ final GridDhtPartitionsExchangeFuture fut)
throws IgniteCheckedException {
- final AffinityTopologyVersion topVer = fut.topologyVersion();
-
final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
final Collection<ClusterNode> aliveNodes = cctx.discovery().nodes(topVer);
@@ -1865,7 +1877,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
- CacheGroupHolder grpHolder = groupHolder(fut.topologyVersion(), desc);
+ CacheGroupHolder grpHolder = groupHolder(topVer, desc);
if (!grpHolder.rebalanceEnabled)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index eeb7b23..1d7b73a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -17,14 +17,9 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.jetbrains.annotations.Nullable;
@@ -39,24 +34,25 @@ public class ExchangeContext {
private boolean fetchAffOnJoin;
/** */
- private final boolean coalescing;
+ private final boolean merge;
/** */
- private AffinityTopologyVersion resTopVer;
-
- /** */
- private final Map<Integer, List<List<ClusterNode>>> affMap = new HashMap<>();
+ private final ExchangeDiscoveryEvents evts;
/**
* @param protocolVer Protocol version.
- * @param topVer Topology version.
+ * @param fut Exchange future.
*/
- public ExchangeContext(int protocolVer, AffinityTopologyVersion topVer) {
+ public ExchangeContext(int protocolVer, GridDhtPartitionsExchangeFuture fut) {
fetchAffOnJoin = protocolVer == 1;
- coalescing = protocolVer > 1;
+ merge = protocolVer > 1;
+
+ evts = new ExchangeDiscoveryEvents(fut);
+ }
- this.resTopVer = topVer;
+ public ExchangeDiscoveryEvents events() {
+ return evts;
}
/**
@@ -84,27 +80,7 @@ public class ExchangeContext {
return requestGrpsAffOnJoin;
}
- public boolean coalescing() {
- return coalescing;
- }
-
- public List<List<ClusterNode>> activeAffinity(GridCacheSharedContext cctx, GridAffinityAssignmentCache aff) {
- List<List<ClusterNode>> assignment = affMap.get(aff.groupId());
-
- if (assignment != null)
- return assignment;
-
- AffinityTopologyVersion affTopVer = aff.lastVersion();
-
- assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() +
- ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']';
-
- List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
-
- assert aff.idealAssignment() != null : "Previous assignment is not available.";
-
- affMap.put(aff.groupId(), curAff);
-
- return curAff;
+ public boolean canMergeExchanges() {
+ return merge;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index fced92e..7d3e256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -17,13 +17,19 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import static com.sun.corba.se.impl.util.RepositoryId.cache;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -39,6 +45,12 @@ public class ExchangeDiscoveryEvents {
private DiscoCache discoCache;
/** */
+ private DiscoveryEvent evt;
+
+ /** */
+ private List<DiscoveryEvent> evts = new ArrayList<>();
+
+ /** */
private boolean srvJoin;
/** */
@@ -47,18 +59,36 @@ public class ExchangeDiscoveryEvents {
/**
* @param fut Future.
*/
- void init(GridDhtPartitionsExchangeFuture fut) {
- topVer = fut.topologyVersion();
- discoCache = fut.discoCache();
+ ExchangeDiscoveryEvents(GridDhtPartitionsExchangeFuture fut) {
+ addEvent(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ }
- ClusterNode node = fut.discoveryEvent().eventNode();
+ boolean groupAddedOnExchange(int grpId, UUID rcvdFrom) {
+ for (DiscoveryEvent evt : evts) {
+ if (evt.type() == EVT_NODE_JOINED && rcvdFrom.equals(evt.eventNode().id()))
+ return true;
+ }
- if (fut.discoveryEvent().type()== EVT_NODE_JOINED)
- srvJoin = !CU.clientNode(node);
- else {
- assert fut.discoveryEvent().type() == EVT_NODE_LEFT || fut.discoveryEvent().type() == EVT_NODE_FAILED;
+ return false;
+ }
- srvLeft = !CU.clientNode(node);
+ void addEvent(AffinityTopologyVersion topVer, DiscoveryEvent evt, DiscoCache cache) {
+ evts.add(evt);
+
+ this.topVer = topVer;
+ this.evt = evt;
+ this.discoCache = cache;
+
+ ClusterNode node = evt.eventNode();
+
+ if (!CU.clientNode(node)) {
+ if (evt.type()== EVT_NODE_JOINED)
+ srvJoin = true;
+ else {
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+
+ srvLeft = !CU.clientNode(node);
+ }
}
}
@@ -66,6 +96,10 @@ public class ExchangeDiscoveryEvents {
return discoCache;
}
+ DiscoveryEvent event() {
+ return evt;
+ }
+
AffinityTopologyVersion topologyVersion() {
return topVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 135b771..4e42bf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1738,28 +1738,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx);
}
- private boolean supportsCoalescing(ClusterNode node) {
+ private boolean supportsMergeExchanges(ClusterNode node) {
return exchangeProtocolVersion(node.version()) > 1;
}
/** */
- private volatile AffinityTopologyVersion coalesceTestWaitVer;
+ private volatile AffinityTopologyVersion exchMergeTestWaitVer;
/**
- * @param coalesceTestWaitVer
+ * For testing only.
+ *
+ * @param exchMergeTestWaitVer Version to wait for.
*/
- public void coalesceTestWaitVersion(AffinityTopologyVersion coalesceTestWaitVer) {
- this.coalesceTestWaitVer = coalesceTestWaitVer;
+ public void mergeExchangesTestWaitVersion(AffinityTopologyVersion exchMergeTestWaitVer) {
+ this.exchMergeTestWaitVer = exchMergeTestWaitVer;
}
- public ExchangeDiscoveryEvents coalesceExchanges(GridDhtPartitionsExchangeFuture curFut) {
- ExchangeDiscoveryEvents evts = null;
-
- AffinityTopologyVersion coalesceTestWaitVer = this.coalesceTestWaitVer;
+ public boolean mergeExchanges(GridDhtPartitionsExchangeFuture curFut) {
+ AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;
- if (coalesceTestWaitVer != null) {
+ if (exchMergeTestWaitVer != null) {
log.info("Coalesce test, waiting for version [exch=" + curFut.topologyVersion() +
- ", waitVer=" + coalesceTestWaitVer + ']');
+ ", waitVer=" + exchMergeTestWaitVer + ']');
long end = U.currentTimeMillis() + 10_000;
@@ -1770,8 +1770,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (task instanceof GridDhtPartitionsExchangeFuture) {
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
- if (coalesceTestWaitVer.equals(fut.topologyVersion())) {
- log.info("Coalesce test, found awaited version: " + coalesceTestWaitVer);
+ if (exchMergeTestWaitVer.equals(fut.topologyVersion())) {
+ log.info("Coalesce test, found awaited version: " + exchMergeTestWaitVer);
found = true;
@@ -1793,40 +1793,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
- if (task instanceof GridDhtPartitionsExchangeFuture) {
- GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
+ synchronized (curFut) {
+ int awaited = 0;
- int evtType = fut.discoveryEvent().type();
+ for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+ if (task instanceof GridDhtPartitionsExchangeFuture) {
+ GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
- if (evtType == EVT_NODE_JOINED) {
DiscoveryEvent evt = fut.discoveryEvent();
ClusterNode node = evt.eventNode();
- if (!supportsCoalescing(node))
+ if (!supportsMergeExchanges(node))
break;
- fut.mergeWithFuture(curFut);
-
- if (evts == null)
- evts = new ExchangeDiscoveryEvents();
-
- evts.init(fut);
+ if (evt.type() == EVT_NODE_JOINED && !CU.clientNode(node))
+ fut.mergeServerJoinExchange(curFut);
exchWorker.futQ.remove(fut);
}
else
break;
-// else if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) {
-//
-// }
}
- else
- break;
- }
- return evts;
+ return awaited == 0;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1ec3d73..463e330 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.ExchangeContext;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -210,6 +212,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>();
/** */
+ private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
+
+ /** */
+ private int awaitMergedMsgs;
+
+ /** */
@GridToStringExclude
private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
@@ -444,7 +452,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
exchCtx = new ExchangeContext(cctx.exchange().exchangeProtocolVersion(discoCache.minimumNodeVersion()),
- topologyVersion());
+ this);
try {
discoCache.updateAlives(cctx.discovery());
@@ -497,51 +505,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
else {
- if (exchCtx.coalescing()) {
-// if (discoEvt.type() == EVT_NODE_JOINED) {
-// if (discoEvt.eventNode().isLocal()) {
-// localJoin();
-//
-// if (crdNode) {
-// exchange = ExchangeType.NONE;
-// }
-// else
-// sendLocalJoinMessage(crd);
-// }
-// else {
-// if (CU.clientNode(discoEvt.eventNode())) {
-// onClientNodeEvent(crdNode);
-//
-// exchange = ExchangeType.NONE;
-// }
-// else {
-// if (cctx.kernalContext().clientNode())
-// exchange = ExchangeType.CLIENT;
-// else {
-//
-// }
-// }
-// }
-// }
-// else {
-//
-// }
- }
- else {
- if (discoEvt.type() == EVT_NODE_JOINED) {
- if (!discoEvt.eventNode().isLocal()) {
- Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
- discoEvt.eventNode().id(),
- topVer);
+ if (discoEvt.type() == EVT_NODE_JOINED) {
+ if (!discoEvt.eventNode().isLocal()) {
+ Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
+ discoEvt.eventNode().id(),
+ topVer);
- cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
- }
- else
- initCachesOnLocalJoin();
+ cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
}
+ else
+ initCachesOnLocalJoin();
+ }
- exchange = CU.clientNode(discoEvt.eventNode()) ?
- onClientNodeEvent(crdNode) :
+ if (exchCtx.canMergeExchanges()) {
+ if (cctx.kernalContext().clientNode() || CU.clientNode(discoEvt.eventNode()))
+ exchange = onClientNodeEvent(crdNode);
+ else
+ exchange = ExchangeType.ALL_2;
+ }
+ else {
+ exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
onServerNodeEvent(crdNode);
}
}
@@ -556,6 +539,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
break;
}
+ case ALL_2: {
+ distributedExchange2();
+
+ break;
+ }
+
case CLIENT: {
initTopologies();
@@ -874,6 +863,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @throws IgniteCheckedException If failed.
*/
+ private void distributedExchange2() throws IgniteCheckedException {
+ if (crd.isLocal()) {
+ if (remaining.isEmpty())
+ onAllReceived();
+ }
+ else
+ sendPartitions(crd);
+
+ initDone();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
private void distributedExchange() throws IgniteCheckedException {
assert crd != null;
@@ -914,11 +917,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
cctx.database().beforeExchange(this);
-//
-// ExchangeDiscoveryEvents mergedEvts = null;
-//
-// if (crd.isLocal())
-// mergedEvts = cctx.exchange().coalesceExchanges(this);
if (crd.isLocal()) {
if (remaining.isEmpty())
@@ -1453,59 +1451,63 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private GridDhtPartitionsExchangeFuture mergedWith;
/** */
- private List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs;
+ private GridDhtPartitionsSingleMessage pendingSingleMsg;
+
+ /** */
+ private Map<ClusterNode, GridDhtPartitionsSingleMessage> pendingClientMsgs;
+
+ private void addMergedJoinExchange(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+ if (mergedJoinExchMsgs == null)
+ mergedJoinExchMsgs = new LinkedHashMap<>();
+
+ if (msg != null)
+ mergedJoinExchMsgs.put(nodeId, msg);
+ else {
+ if (cctx.discovery().alive(nodeId))
+ awaitMergedMsgs++;
+ else
+ mergedJoinExchMsgs.put(nodeId, null);
+ }
+ }
/**
* @param fut Current exchange to merge with.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- public void mergeWithFuture(final GridDhtPartitionsExchangeFuture fut) {
+ public void mergeServerJoinExchange(final GridDhtPartitionsExchangeFuture fut) {
log.info("Merge exchange future [fut=" + topologyVersion() + ", mergeWith=" + fut.topologyVersion() + ']');
- List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs = null;
-
synchronized (this) {
- synchronized (fut) {
- assert !isDone();
- assert !initFut.isDone();
- assert mergedWith == null;
+ assert !isDone();
+ assert !initFut.isDone();
+ assert mergedWith == null;
+ assert !CU.clientNode(discoEvt.eventNode()) : discoEvt;
- mergedWith = fut;
+ mergedWith = fut;
- if (this.pendingMsgs != null) {
- pendingMsgs = this.pendingMsgs;
+ fut.addMergedJoinExchange(discoEvt.eventNode().id(), pendingSingleMsg);
- T2<UUID, GridDhtPartitionsSingleMessage> joinedSrvMsg = null;
+ // TODO 5578 client messages.
+ }
+ }
- if (discoEvt.type() == EVT_NODE_JOINED && !CU.clientNode(discoEvt.eventNode())) {
- for (Iterator<T2<UUID, GridDhtPartitionsSingleMessage>> it = pendingMsgs.iterator(); it.hasNext();) {
- T2<UUID, GridDhtPartitionsSingleMessage> msg = it.next();
+ void onReceiveMerged(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+ boolean done = false;
- if (msg.get1().equals(discoEvt.eventNode().id())) {
- joinedSrvMsg = msg;
+ synchronized (this) {
+ if (mergedJoinExchMsgs != null && !mergedJoinExchMsgs.containsKey(node.id())) {
+ mergedJoinExchMsgs.put(node.id(), msg);
- it.remove();
+ assert awaitMergedMsgs > 0 : awaitMergedMsgs;
- break;
- }
- }
+ awaitMergedMsgs--;
- if (pendingMsgs.isEmpty())
- pendingMsgs = null;
- }
- }
+ done = awaitMergedMsgs == 0;
}
}
- if (pendingMsgs != null) {
- final List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs0 = pendingMsgs;
+ if (done) {
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut0) {
- for (T2<UUID, GridDhtPartitionsSingleMessage> msg : pendingMsgs0)
- fut.processSingleMessage(msg.get1(), msg.get2());
- }
- });
}
}
@@ -1516,7 +1518,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
assert msg != null;
assert exchId.equals(msg.exchangeId()) : msg;
- assert msg.lastVersion() != null : msg;
if (isDone()) {
if (log.isDebugEnabled())
@@ -1537,15 +1538,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (mergedWith != null)
mergedWith0 = mergedWith;
else {
- if (pendingMsgs == null)
- pendingMsgs = new ArrayList<>();
-
- pendingMsgs.add(new T2<>(node.id(), msg));
+ if (msg.client()) {
+ assert false;
+ }
+ else if (exchangeId().isJoined() && node.id().equals(exchId.nodeId()))
+ pendingSingleMsg = msg;
}
}
if (mergedWith0 != null) {
- mergedWith0.onReceive(node, msg);
+ mergedWith0.onReceiveMerged(node, msg);
return;
}
@@ -1857,6 +1859,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ if (exchCtx.canMergeExchanges()) {
+ cctx.exchange().mergeExchanges(this);
+
+ cctx.affinity().onTopologyChange(this, true);
+
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+ continue;
+
+ grp.topology().beforeExchange(this, true);
+ }
+ }
+
Map<Integer, CacheGroupAffinityMessage> cachesAff = null;
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
@@ -2160,7 +2175,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
}
- finishState = new FinishState(crd.id());
+ finishState = new FinishState(crd.id(), msg.resultTopologyVersion());
+ }
+
+ if (exchCtx.canMergeExchanges()) {
+ try {
+ onServerNodeEvent(true);
+
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+ continue;
+
+ grp.topology().beforeExchange(this, true);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ // TODO 5578.
+ U.error(log, "Failed: " + e, e);
+ }
}
Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
@@ -2568,8 +2600,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
enum ExchangeType {
/** */
CLIENT,
+
/** */
ALL,
+
+ /** */
+ ALL_2,
+
/** */
NONE
}
@@ -2680,11 +2717,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private final UUID crdId;
+ /** */
+ private final AffinityTopologyVersion topVer;
+
/**
* @param crdId Coordinator node.
*/
- FinishState(UUID crdId) {
+ FinishState(UUID crdId, AffinityTopologyVersion topVer) {
this.crdId = crdId;
+ this.topVer = topVer;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 4c79c3b..8a5dbbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -154,6 +154,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
cp.cachesAff = cachesAff;
}
+ AffinityTopologyVersion resultTopologyVersion() {
+ return resTopVer;
+ }
+
/**
* @param cachesAff Affinity.
* @return Message copy.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
deleted file mode 100644
index dbd3971..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class CacheExchangeCoalescingTest extends GridCommonAbstractTest {
- /** */
- private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private boolean client;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
- cfg.setClientMode(client);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testConcurrentJoin1() throws Exception {
- IgniteEx srv0 = startGrid(0);
-
- srv0.context().cache().context().exchange().coalesceTestWaitVersion(new AffinityTopologyVersion(3, 0));
-
- final AtomicInteger idx = new AtomicInteger(1);
-
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- startGrid(idx.getAndIncrement());
-
- return null;
- }
- }, 2, "start-node");
-
- fut.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
new file mode 100644
index 0000000..ef8a1da
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheExchangeMergeTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentJoin1() throws Exception {
+ IgniteEx srv0 = startGrid(0);
+
+ srv0.context().cache().context().exchange().mergeExchangesTestWaitVersion(new AffinityTopologyVersion(3, 0));
+
+ final AtomicInteger idx = new AtomicInteger(1);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startGrid(idx.getAndIncrement());
+
+ return null;
+ }
+ }, 2, "start-node");
+
+ fut.get();
+ }
+}