You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/03 12:55:41 UTC
[1/4] ignite git commit: ignite-5578
Repository: ignite
Updated Branches:
refs/heads/ignite-5578 8beacecfd -> d4030225a
ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b6ab179
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b6ab179
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b6ab179
Branch: refs/heads/ignite-5578
Commit: 6b6ab179837dbaa6e1f0d380d08754e8649aea32
Parents: 8beacec
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 3 12:41:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 3 12:41:37 2017 +0300
----------------------------------------------------------------------
.../internal/processors/closure/GridClosureProcessor.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b6ab179/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index cf4ea81..2ecea07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -862,8 +862,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Executes closure on system pool. Companion to {@link #runLocal(Runnable, boolean)} but
- * in case of rejected execution re-runs the closure in the current thread (blocking).
+ * Executes closure on system pool. In case of rejected execution re-runs the closure in the current
+ * thread (blocking).
*
* @param c Closure to execute.
* @return Future.
@@ -873,8 +873,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
- * the closure in the current thread (blocking).
+ * In case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
@@ -885,8 +884,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs
- * the closure in the current thread (blocking).
+ * In case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param plc Policy to choose executor pool.
[3/4] ignite git commit: ignite-5578
Posted by sb...@apache.org.
ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3ecc169
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3ecc169
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3ecc169
Branch: refs/heads/ignite-5578
Commit: e3ecc169ceab543748c9ba3ef409540697cf2529
Parents: b1579ea
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 3 15:45:23 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 3 15:45:23 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 140 ++++++++-----------
.../processors/cache/ExchangeContext.java | 4 +-
.../cache/ExchangeDiscoveryEvents.java | 6 +-
.../GridCachePartitionExchangeManager.java | 26 ++--
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 139 +++++++++---------
.../dht/preloader/InitNewCoordinatorFuture.java | 2 +-
.../GridCacheDatabaseSharedManager.java | 2 +-
.../CacheLateAffinityAssignmentTest.java | 2 +-
10 files changed, 154 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index e6f5d6b..dc905a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -759,7 +759,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
evts.topologyVersion());
if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
- if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
+ if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
}
}
@@ -957,7 +957,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
IgniteUuid deploymentId = desc.deploymentId();
if (!deploymentId.equals(deploymentIds.get(aff.groupId()))) {
- aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
+ aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
return;
}
@@ -991,7 +991,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
aff.initialize(topVer, cachedAssignment(aff, assignment, affCache));
}
else
- aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
+ aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
}
});
}
@@ -1004,14 +1004,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @throws IgniteCheckedException If failed.
*/
public void onClientEvent(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
- boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
+ boolean locJoin = fut.firstEvent().eventNode().isLocal();
if (!locJoin) {
forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
AffinityTopologyVersion topVer = fut.initialVersion();
- aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
+ aff.clientEventTopologyChange(fut.firstEvent(), topVer);
}
});
}
@@ -1111,11 +1111,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert old == null : old;
- List<List<ClusterNode>> newAff = grpHolder.affinity().calculate(fut.initialVersion(),
- fut.discoveryEvent(),
- fut.discoCache());
-
- grpHolder.affinity().initialize(fut.initialVersion(), newAff);
+ calculateAndInit(fut.events(), grpHolder.affinity(), fut.initialVersion());
}
else if (grpHolder.client() && grp != null) {
assert grpHolder.affinity().idealAssignment() != null;
@@ -1149,12 +1145,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
- if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
- List<List<ClusterNode>> assignment =
- cache.affinity().calculate(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
-
- cache.affinity().initialize(fut.initialVersion(), assignment);
- }
+ if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE))
+ calculateAndInit(fut.events(), cache.affinity(), fut.initialVersion());
}
});
}
@@ -1182,13 +1174,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
ExchangeDiscoveryEvents evts = fut.context().events();
- if (canCalculateAffinity(desc, aff, fut)) {
- List<List<ClusterNode>> assignment = aff.calculate(evts.topologyVersion(),
- evts.lastEvent(),
- evts.discoveryCache());
-
- aff.initialize(evts.topologyVersion(), assignment);
- }
+ if (canCalculateAffinity(desc, aff, fut))
+ calculateAndInit(evts, aff, evts.topologyVersion());
else {
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
desc.groupId(),
@@ -1220,7 +1207,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
return true;
// If local node did not initiate exchange or local node is the only cache node in grid.
- Collection<ClusterNode> affNodes = fut.discoCache().cacheGroupAffinityNodes(aff.groupId());
+ Collection<ClusterNode> affNodes = fut.events().discoveryCache().cacheGroupAffinityNodes(aff.groupId());
return fut.cacheGroupAddedOnExchange(aff.groupId(), desc.receivedFrom()) ||
!fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
@@ -1330,13 +1317,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
aff.initialize(evts.topologyVersion(), assignments);
}
- else if (fut.cacheGroupAddedOnExchange(aff.groupId(), grp.receivedFrom())) {
- List<List<ClusterNode>> assignment = aff.calculate(evts.topologyVersion(),
- evts.lastEvent(),
- evts.discoveryCache());
-
- aff.initialize(evts.topologyVersion(), assignment);
- }
+ else if (fut.cacheGroupAddedOnExchange(aff.groupId(), grp.receivedFrom()))
+ calculateAndInit(evts, aff, evts.topologyVersion());
grp.topology().initPartitionsWhenAffinityReady(resTopVer, fut);
}
@@ -1412,9 +1394,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @throws IgniteCheckedException If failed.
*/
public void onServerJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
- assert !fut.discoveryEvent().eventNode().isClient();
+ assert !fut.firstEvent().eventNode().isClient();
- boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
+ boolean locJoin = fut.firstEvent().eventNode().isLocal();
WaitRebalanceInfo waitRebalanceInfo = null;
@@ -1424,13 +1406,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
AffinityTopologyVersion topVer = fut.initialVersion();
- CacheGroupHolder cache = groupHolder(topVer, desc);
-
- List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
- fut.discoveryEvent(),
- fut.discoCache());
+ CacheGroupHolder grpHolder = groupHolder(topVer, desc);
- cache.affinity().initialize(topVer, newAff);
+ calculateAndInit(fut.events(), grpHolder.affinity(), topVer);
}
});
}
@@ -1485,6 +1463,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param evts Discovery events.
+ * @param aff Affinity.
+ * @param topVer Topology version.
+ */
+ private void calculateAndInit(ExchangeDiscoveryEvents evts,
+ GridAffinityAssignmentCache aff,
+ AffinityTopologyVersion topVer)
+ {
+ List<List<ClusterNode>> assignment = aff.calculate(topVer,
+ evts.lastEvent(),
+ evts.discoveryCache());
+
+ aff.initialize(topVer, assignment);
+ }
+
+ /**
* @param fut Exchange future.
* @throws IgniteCheckedException If failed.
*/
@@ -1498,14 +1492,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
continue;
if (fut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) {
- // Do not calculate affinity since it can change in case of exchange merge.
- if (!fut.context().mergeExchanges()) {
- List<List<ClusterNode>> assignment = grp.affinity().calculate(topVer,
- fut.discoveryEvent(),
- fut.discoCache());
-
- grp.affinity().initialize(topVer, assignment);
- }
+ // In case if merge is allowed do not calculate affinity since it can change on exchange end.
+ if (!fut.context().mergeExchanges())
+ calculateAndInit(fut.events(), grp.affinity(), topVer);
}
else {
if (fut.context().fetchAffinityOnJoin()) {
@@ -1516,22 +1505,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
grpDesc.groupId(),
topVer,
- fut.discoCache());
+ fut.events().discoveryCache());
fetchFut.init(false);
fetchFuts.add(fetchFut);
}
else {
- if (fut.discoCache().serverNodes().size() > 0)
+ if (fut.events().discoveryCache().serverNodes().size() > 0)
fut.context().addGroupAffinityRequestOnJoin(grp.groupId());
- else {
- List<List<ClusterNode>> aff = grp.affinity().calculate(topVer,
- fut.discoveryEvent(),
- fut.discoCache());
-
- grp.affinity().initialize(topVer, aff);
- }
+ else
+ calculateAndInit(fut.events(), grp.affinity(), topVer);
}
}
}
@@ -1542,8 +1526,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
int grpId = fetchFut.groupId();
fetchAffinity(topVer,
- fut.discoveryEvent(),
- fut.discoCache(),
+ fut.events().lastEvent(),
+ fut.events().discoveryCache(),
cctx.cache().cacheGroup(grpId).affinity(),
fetchFut);
}
@@ -1603,7 +1587,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @return {@code True} if affinity should be assigned by coordinator.
*/
public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
- ClusterNode leftNode = fut.discoveryEvent().eventNode();
+ ClusterNode leftNode = fut.firstEvent().eventNode();
assert !leftNode.isClient() : leftNode;
@@ -1613,14 +1597,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
- cache.aff.calculate(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
+ cache.aff.calculate(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
}
});
}
else {
forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
- aff.calculate(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
+ aff.calculate(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
}
});
}
@@ -1670,12 +1654,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
final GridAffinityAssignmentCache aff = grpHolder.affinity();
if (newAff) {
- if (!aff.lastVersion().equals(topVer)) {
- List<List<ClusterNode>> assign =
- aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
-
- aff.initialize(topVer, assign);
- }
+ if (!aff.lastVersion().equals(topVer))
+ calculateAndInit(fut.events(), aff, topVer);
grpHolder.topology().beforeExchange(fut, true, false);
}
@@ -1700,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
desc.groupId(),
prev.topologyVersion(),
- prev.discoCache());
+ prev.events().discoveryCache());
fetchFut.init(false);
@@ -1710,12 +1690,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
@Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
throws IgniteCheckedException {
fetchAffinity(prev.topologyVersion(),
- prev.discoveryEvent(),
- prev.discoCache(),
+ prev.events().lastEvent(),
+ prev.events().discoveryCache(),
aff,
(GridDhtAssignmentFetchFuture)fetchFut);
- aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+ aff.calculate(topVer, fut.events().lastEvent(), fut.events().discoveryCache());
affFut.onDone(topVer);
}
@@ -1730,13 +1710,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (newAff) {
GridAffinityAssignmentCache aff = grpHolder.affinity();
- if (!aff.lastVersion().equals(topVer)) {
- List<List<ClusterNode>> assign = aff.calculate(topVer,
- fut.discoveryEvent(),
- fut.discoCache());
-
- aff.initialize(topVer, assign);
- }
+ if (!aff.lastVersion().equals(topVer))
+ calculateAndInit(fut.events(), aff, topVer);
grpHolder.topology().beforeExchange(fut, true, false);
}
@@ -1908,13 +1883,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
throws IgniteCheckedException
{
if (addedOnExchnage) {
- if (!aff.lastVersion().equals(evts.topologyVersion())) {
- List<List<ClusterNode>> newAff = aff.calculate(evts.topologyVersion(),
- evts.lastEvent(),
- evts.discoveryCache());
-
- aff.initialize(evts.topologyVersion(), newAff);
- }
+ if (!aff.lastVersion().equals(evts.topologyVersion()))
+ calculateAndInit(evts, aff, evts.topologyVersion());
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 94040ba..6442752 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -56,7 +56,7 @@ public class ExchangeContext {
* @param fut Exchange future.
*/
public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
- int protocolVer = exchangeProtocolVersion(fut.discoCache().minimumNodeVersion());
+ int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
if (compatibilityNode || (crd && fut.localJoinExchange())) {
fetchAffOnJoin = true;
@@ -71,7 +71,7 @@ public class ExchangeContext {
merge = !startCaches &&
protocolVer > 1 &&
- fut.discoveryEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
+ fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
}
evts = new ExchangeDiscoveryEvents(fut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index ff61701..d4fbe60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -71,7 +71,7 @@ public class ExchangeDiscoveryEvents {
* @param fut Current exchange future.
*/
ExchangeDiscoveryEvents(GridDhtPartitionsExchangeFuture fut) {
- addEvent(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
+ addEvent(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
}
/**
@@ -106,7 +106,7 @@ public class ExchangeDiscoveryEvents {
* @return Last server join/fail event version.
*/
AffinityTopologyVersion lastServerEventVersion() {
- assert srvEvtTopVer != null;
+ assert srvEvtTopVer != null : this;
return srvEvtTopVer;
}
@@ -169,7 +169,7 @@ public class ExchangeDiscoveryEvents {
/**
* @return Last event.
*/
- DiscoveryEvent lastEvent() {
+ public DiscoveryEvent lastEvent() {
return lastSrvEvt != null ? lastSrvEvt : lastEvt;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6efd19e..75ddacd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1796,15 +1796,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
", mergedFut=" + fut.initialVersion() +
- ", evt=" + IgniteUtils.gridEventName(fut.discoveryEvent().type()) +
- ", evtNode=" + fut.discoveryEvent().eventNode().id()+
- ", evtNodeClient=" + CU.clientNode(fut.discoveryEvent().eventNode())+ ']');
+ ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+ ", evtNode=" + fut.firstEvent().eventNode().id()+
+ ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
- DiscoveryEvent evt = fut.discoveryEvent();
+ DiscoveryEvent evt = fut.firstEvent();
curFut.context().events().addEvent(fut.initialVersion(),
- fut.discoveryEvent(),
- fut.discoCache());
+ fut.firstEvent(),
+ fut.firstEventCache());
if (evt.type() == EVT_NODE_JOINED) {
final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut);
@@ -1888,7 +1888,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (task instanceof GridDhtPartitionsExchangeFuture) {
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
- DiscoveryEvent evt = fut.discoveryEvent();
+ DiscoveryEvent evt = fut.firstEvent();
if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
log.info("Stop merge, custom event found: " + evt);
@@ -1911,13 +1911,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
", mergedFut=" + fut.initialVersion() +
- ", evt=" + IgniteUtils.gridEventName(fut.discoveryEvent().type()) +
- ", evtNode=" + fut.discoveryEvent().eventNode().id() +
- ", evtNodeClient=" + CU.clientNode(fut.discoveryEvent().eventNode())+ ']');
+ ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+ ", evtNode=" + fut.firstEvent().eventNode().id() +
+ ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
curFut.context().events().addEvent(fut.initialVersion(),
- fut.discoveryEvent(),
- fut.discoCache());
+ fut.firstEvent(),
+ fut.firstEventCache());
if (evt.type() == EVT_NODE_JOINED) {
if (fut.mergeJoinExchange(curFut))
@@ -2214,7 +2214,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean newCrd = false;
if (!crd) {
- List<ClusterNode> srvNodes = exchFut.discoCache().serverNodes();
+ List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes();
crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5a16bc9..12f3fb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -258,7 +258,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (stopping)
return;
- discoCache = exchFut.discoCache();
+ discoCache = exchFut.events().discoveryCache();
beforeExchange0(loc, exchFut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index e99ad2c..5e58502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -518,7 +518,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (grp.affinityNode()) {
if (grpStarted ||
- exchFut.discoveryEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
+ exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
exchFut.serverNodeDiscoveryEvent()) {
if (affReady) {
assert grp.affinity().lastVersion().equals(evts.topologyVersion());
@@ -626,7 +626,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
changed = true;
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+ DiscoveryEvent discoEvt = exchFut.events().lastEvent();
grp.addRebalanceEvent(p,
EVT_CACHE_REBALANCE_PART_DATA_LOST,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index be6acf4..3d7749b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -125,10 +125,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
@GridToStringExclude
- private volatile DiscoCache discoCache;
+ private volatile DiscoCache firstEvtDiscoCache;
- /** Discovery event. */
- private volatile DiscoveryEvent discoEvt;
+ /** Discovery event triggered this exchange. */
+ private volatile DiscoveryEvent firstDiscoEvt;
/** */
@GridToStringExclude
@@ -174,7 +174,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** Last committed cache version before next topology version use. */
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
- /** */
+ /**
+ * Message received from node joining cluster, needed if this join exchange
+ * is merged with previous one.
+ */
@GridToStringExclude
private GridDhtPartitionsSingleMessage pendingJoinMsg;
@@ -222,11 +225,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>();
- /** */
+ /** Single messages from merged 'node join' exchanges. */
@GridToStringExclude
private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
- /** */
+ /** Number of awaited messages for merged 'node join' exchanges. */
@GridToStringExclude
private int awaitMergedMsgs;
@@ -255,7 +258,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@GridToStringExclude
private FinishState finishState;
- /** */
+ /** Initialized when node becomes new coordinator. */
@GridToStringExclude
private InitNewCoordinatorFuture newCrdFut;
@@ -371,15 +374,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @return Discovery cache.
- *
- * TODO 5578 review usages, rename initialDiscoveryEvent
- */
- public DiscoCache discoCache() {
- return discoCache;
- }
-
- /**
* @param cacheId Cache ID.
* @param rcvdFrom Node ID cache was received from.
* @return {@code True} if cache was added during this exchange.
@@ -431,8 +425,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert exchId.equals(this.exchId);
this.exchId.discoveryEvent(discoEvt);
- this.discoEvt = discoEvt;
- this.discoCache = discoCache;
+ this.firstDiscoEvt= discoEvt;
+ this.firstEvtDiscoCache = discoCache;
evtLatch.countDown();
}
@@ -459,12 +453,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @return Discovery event.
+ * @return First event discovery event.
*
- * TODO 5578 review usages, rename initialDiscoveryEvent
*/
- public DiscoveryEvent discoveryEvent() {
- return discoEvt;
+ public DiscoveryEvent firstEvent() {
+ return firstDiscoEvt;
+ }
+
+ /**
+ * @return Discovery cache for first event.
+ */
+ public DiscoCache firstEventCache() {
+ return firstEvtDiscoCache;
+ }
+
+ /**
+ * @return Events processed in this exchange.
+ */
+ public ExchangeDiscoveryEvents events() {
+ return exchCtx.events();
}
/**
@@ -525,13 +532,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
U.await(evtLatch);
- assert discoEvt != null : this;
- assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
+ assert firstDiscoEvt != null : this;
+ assert exchId.nodeId().equals(firstDiscoEvt.eventNode().id()) : this;
try {
AffinityTopologyVersion topVer = initialVersion();
- srvNodes = new ArrayList<>(discoCache.serverNodes());
+ srvNodes = new ArrayList<>(firstEvtDiscoCache.serverNodes());
remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@ -551,18 +558,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (exchLog.isInfoEnabled()) {
exchLog.info("Started exchange init [topVer=" + topVer +
", crd=" + crdNode +
- ", evt=" + IgniteUtils.gridEventName(discoEvt.type()) +
- ", evtNode=" + discoEvt.eventNode().id() +
- ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent) discoEvt).customMessage() : null) +
+ ", evt=" + IgniteUtils.gridEventName(firstDiscoEvt.type()) +
+ ", evtNode=" + firstDiscoEvt.eventNode().id() +
+ ", customEvt=" + (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)firstDiscoEvt).customMessage() : null) +
", allowMerge=" + exchCtx.mergeExchanges() + ']');
}
ExchangeType exchange;
- if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+ if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
assert !exchCtx.mergeExchanges();
- DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
+ DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
if (msg instanceof ChangeGlobalStateMessage) {
assert exchActions != null && !exchActions.empty();
@@ -575,7 +582,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
exchange = onCacheChangeRequest(crdNode);
}
else if (msg instanceof SnapshotDiscoveryMessage) {
- exchange = CU.clientNode(discoEvt.eventNode()) ?
+ exchange = CU.clientNode(firstDiscoEvt.eventNode()) ?
onClientNodeEvent(crdNode) :
onServerNodeEvent(crdNode);
}
@@ -588,10 +595,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
initCoordinatorCaches(newCrd);
}
else {
- if (discoEvt.type() == EVT_NODE_JOINED) {
- if (!discoEvt.eventNode().isLocal()) {
+ if (firstDiscoEvt.type() == EVT_NODE_JOINED) {
+ if (!firstDiscoEvt.eventNode().isLocal()) {
Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
- discoEvt.eventNode().id(),
+ firstDiscoEvt.eventNode().id(),
topVer);
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
@@ -616,7 +623,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
else {
- if (CU.clientNode(discoEvt.eventNode()))
+ if (CU.clientNode(firstDiscoEvt.eventNode()))
exchange = onClientNodeEvent(crdNode);
else
exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
@@ -626,7 +633,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
onLeft();
}
else {
- exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
+ exchange = CU.clientNode(firstDiscoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
onServerNodeEvent(crdNode);
}
}
@@ -761,13 +768,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
top.updateTopologyVersion(
this,
- discoCache(),
+ events().discoveryCache(),
updSeq,
cacheGroupStopping(grp.groupId()));
}
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
- top.updateTopologyVersion(this, discoCache(), -1, cacheGroupStopping(top.groupId()));
+ top.updateTopologyVersion(this, events().discoveryCache(), -1, cacheGroupStopping(top.groupId()));
}
/**
@@ -892,19 +899,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @return Exchange type.
*/
private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedException {
- assert CU.clientNode(discoEvt.eventNode()) : this;
+ assert CU.clientNode(firstDiscoEvt.eventNode()) : this;
- if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) {
+ if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
onLeft();
- assert !discoEvt.eventNode().isLocal() : discoEvt;
+ assert !firstDiscoEvt.eventNode().isLocal() : firstDiscoEvt;
}
else
- assert discoEvt.type() == EVT_NODE_JOINED || discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+ assert firstDiscoEvt.type() == EVT_NODE_JOINED || firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : firstDiscoEvt;
cctx.affinity().onClientEvent(this, crd);
- return discoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE;
+ return firstDiscoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE;
}
/**
@@ -913,9 +920,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @return Exchange type.
*/
private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedException {
- assert !CU.clientNode(discoEvt.eventNode()) : this;
+ assert !CU.clientNode(firstDiscoEvt.eventNode()) : this;
- if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) {
+ if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
onLeft();
exchCtx.events().warnNoAffinityNodes(cctx);
@@ -979,7 +986,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
waitPartitionRelease();
- boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
+ boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal() || cacheStopping(cacheCtx.cacheId()))
@@ -1021,7 +1028,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
long start = U.currentTimeMillis();
- IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
+ IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(events().lastEvent());
if (fut != null) {
fut.get();
@@ -1190,7 +1197,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @return {@code True} if exchange for local node join.
*/
public boolean localJoinExchange() {
- return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal();
+ return firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isLocal();
}
/**
@@ -1423,7 +1430,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
for (CacheGroupContext grp : cctx.cache().cacheGroups())
- m.put(grp.groupId(), validateCacheGroup(grp, discoEvt.topologyNodes()));
+ m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes()));
grpValidRes = m;
}
@@ -1483,8 +1490,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
exchActions = null;
- if (discoEvt instanceof DiscoveryCustomEvent)
- ((DiscoveryCustomEvent)discoEvt).customMessage(null);
+ if (firstDiscoEvt instanceof DiscoveryCustomEvent)
+ ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);
if (err == null)
cctx.exchange().lastFinishedFuture(this);
@@ -1528,9 +1535,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Records that this exchange if merged with another 'node join' exchange.
+ *
* @param node Joined node.
* @param msg Joined node message if already received.
- * @return {@code True} if need wait for message from joined server node.
+ * @return {@code True} if need to wait for message from joined server node.
*/
private boolean addMergedJoinExchange(ClusterNode node, @Nullable GridDhtPartitionsSingleMessage msg) {
assert Thread.holdsLock(mux);
@@ -1582,6 +1591,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Merges this exchange with given one.
+ *
* @param fut Current exchange to merge with.
* @return {@code True} if need wait for message from joined server node.
*/
@@ -1596,7 +1607,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
mergedWith = fut;
- ClusterNode joinedNode = discoEvt.eventNode();
+ ClusterNode joinedNode = firstDiscoEvt.eventNode();
wait = fut.addMergedJoinExchange(joinedNode, pendingJoinMsg);
}
@@ -1784,7 +1795,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (finishState0 == null) {
- assert discoEvt.type() == EVT_NODE_JOINED && CU.clientNode(discoEvt.eventNode()) : this;
+ assert firstDiscoEvt.type() == EVT_NODE_JOINED && CU.clientNode(firstDiscoEvt.eventNode()) : this;
finishState0 = new FinishState(cctx.localNodeId(),
initialVersion(),
@@ -2093,7 +2104,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal()) {
- boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, discoEvt);
+ boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, events().lastEvent());
detected |= detectedOnGrp;
}
@@ -2130,7 +2141,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- *
+ * @param sndResNodes Additional nodes to send finish message to.
*/
private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) {
try {
@@ -2138,7 +2149,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert partHistSuppliers.isEmpty() : partHistSuppliers;
- if (!crd.equals(discoCache.serverNodes().get(0)) && !exchCtx.mergeExchanges()) {
+ if (!exchCtx.mergeExchanges() && !crd.equals(events().discoveryCache().serverNodes().get(0))) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal())
grp.topology().beforeExchange(this, !centralizedAff, false);
@@ -2165,7 +2176,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- *
+ * @param sndResNodes Additional nodes to send finish message to.
*/
private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndResNodes) {
try {
@@ -2239,13 +2250,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
- assert discoEvt instanceof DiscoveryCustomEvent;
+ if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+ assert firstDiscoEvt instanceof DiscoveryCustomEvent;
if (activateCluster())
assignPartitionsStates();
- if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
+ if (((DiscoveryCustomEvent)firstDiscoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
if (exchActions != null) {
Set<String> caches = exchActions.cachesToResetLostPartitions();
@@ -2906,7 +2917,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
ClusterNode crd0;
- discoCache.updateAlives(node);
+ events().discoveryCache().updateAlives(node);
InitNewCoordinatorFuture newCrdFut0;
@@ -3269,8 +3280,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
*/
public String shortInfo() {
return "GridDhtPartitionsExchangeFuture [topVer=" + initialVersion() +
- ", evt=" + (discoEvt != null ? IgniteUtils.gridEventName(discoEvt.type()) : -1) +
- ", evtNode=" + (discoEvt != null ? discoEvt.eventNode() : null) +
+ ", evt=" + (firstDiscoEvt != null ? IgniteUtils.gridEventName(firstDiscoEvt.type()) : -1) +
+ ", evtNode=" + (firstDiscoEvt != null ? firstDiscoEvt.eventNode() : null) +
", done=" + isDone() + ']';
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 71db8ad..42ce9b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -102,7 +102,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
if (restoreState) {
DiscoCache curDiscoCache = cctx.discovery().discoCache();
- DiscoCache discoCache = exchFut.discoCache();
+ DiscoCache discoCache = exchFut.events().discoveryCache();
List<ClusterNode> nodes = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 7713caa5..0178bb6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -721,7 +721,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
- DiscoveryEvent discoEvt = fut.discoveryEvent();
+ DiscoveryEvent discoEvt = fut.firstEvent();
boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 0d106a4..695d8a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2809,7 +2809,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
GridDhtPartitionsExchangeFuture fut = futs.get(i);
if (fut.initialVersion().equals(topVer0)) {
- evt = fut.discoveryEvent();
+ evt = fut.firstEvent();
break;
}
[2/4] ignite git commit: ignite-5578
Posted by sb...@apache.org.
ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1579ea1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1579ea1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1579ea1
Branch: refs/heads/ignite-5578
Commit: b1579ea1b1980e4cdf4fe378f21013336063909e
Parents: 6b6ab17
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 3 14:09:23 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 3 14:09:23 2017 +0300
----------------------------------------------------------------------
.../cache/IgniteDynamicCacheStartSelfTest.java | 21 ++++++--------------
1 file changed, 6 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1579ea1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 05dbce0..8f601f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -324,9 +324,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount(); g++) {
IgniteEx kernal0 = grid(g);
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
info("Getting cache for node: " + g);
assertNotNull(grid(g).cache(DYNAMIC_CACHE_NAME));
@@ -345,14 +342,13 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
kernal.destroyCache(DYNAMIC_CACHE_NAME);
+ awaitPartitionMapExchange();
+
for (int g = 0; g < nodeCount(); g++) {
final IgniteKernal kernal0 = (IgniteKernal)grid(g);
final int idx = g;
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
assertNull(kernal0.cache(DYNAMIC_CACHE_NAME));
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -387,9 +383,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount(); g++) {
IgniteEx kernal0 = grid(g);
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
info("Getting cache for node: " + g);
for (int i = 0; i < cacheCnt; i++)
@@ -423,6 +416,8 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
kernal.destroyCaches(namesToDestroy);
+ awaitPartitionMapExchange();
+
for (int g = 0; g < nodeCount(); g++) {
final IgniteKernal kernal0 = (IgniteKernal)grid(g);
@@ -430,9 +425,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
final int idx = g * nodeCount() + i;
final int expVal = i;
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
assertNull(kernal0.cache(DYNAMIC_CACHE_NAME));
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -536,13 +528,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}
}
+ awaitPartitionMapExchange();
+
// Check that cache is not deployed on new node after undeploy.
for (int g = 0; g < nodeCount() + 2; g++) {
final IgniteKernal kernal0 = (IgniteKernal)grid(g);
- for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
- f.get();
-
if (g < nodeCount())
assertNotNull(grid(g).cache(DYNAMIC_CACHE_NAME));
else
[4/4] ignite git commit: ignite-5578
Posted by sb...@apache.org.
ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4030225
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4030225
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4030225
Branch: refs/heads/ignite-5578
Commit: d4030225aa813052dd63c2d78428fb3ab077873c
Parents: e3ecc16
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 3 15:55:31 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 3 15:55:31 2017 +0300
----------------------------------------------------------------------
.../GridDhtPartitionsExchangeFuture.java | 54 ++++++++++++--------
.../distributed/CacheExchangeMergeTest.java | 19 +++++++
2 files changed, 51 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4030225/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3d7749b..b43fe92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -175,8 +175,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
/**
- * Message received from node joining cluster, needed if this join exchange
- * is merged with previous one.
+ * Message received from node joining cluster (if this is 'node join' exchange),
+ * needed if this exchange is merged with another one.
*/
@GridToStringExclude
private GridDhtPartitionsSingleMessage pendingJoinMsg;
@@ -2877,6 +2877,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
initFut.onDone(true);
}
+ /**
+ *
+ */
private void onAllServersLeft() {
assert cctx.kernalContext().clientNode() : cctx.localNode();
@@ -3235,20 +3238,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return exchId.hashCode();
}
- /**
- *
- */
- enum ExchangeType {
- /** */
- CLIENT,
-
- /** */
- ALL,
-
- /** */
- NONE
- }
-
/** {@inheritDoc} */
@Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) {
if (!isDone()) {
@@ -3376,18 +3365,39 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
*
*/
- private enum ExchangeLocalState {
+ enum ExchangeType {
/** */
- CRD,
+ CLIENT,
+
/** */
- SRV,
+ ALL,
+
/** */
+ NONE
+ }
+ /**
+ *
+ */
+ private enum ExchangeLocalState {
+ /** Local node is coordinator. */
+ CRD,
+
+ /** Local node is non-coordinator server. */
+ SRV,
+
+ /** Local node is client node. */
CLIENT,
- /** */
+
+ /**
+ * Previous coordinator failed before echange finished and
+ * local performs initialization to become new coordinator.
+ */
BECOME_CRD,
- /** */
+
+ /** Exchange finished. */
DONE,
- /** */
+
+ /** This exchange was merged with another one. */
MERGED
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4030225/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 04d406c..ec41060 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -656,6 +656,25 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testMergeAndNewCoordinator() throws Exception {
+ final Ignite srv0 = startGrids(3);
+
+ mergeExchangeWaitVersion(srv0, 6);
+
+ IgniteInternalFuture fut = startGrids(srv0, 3, 3);
+
+ fut.get();
+
+ checkCaches();
+
+ stopGrid(0);
+
+ checkCaches();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testMergeServersFail1_1() throws Exception {
mergeServersFail1(false);
}