You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/31 09:01:16 UTC
ignite git commit: disco cache cleanup
Repository: ignite
Updated Branches:
refs/heads/ignite-3478-1 [created] b7b9089f0
disco cache cleanup
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7b9089f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7b9089f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7b9089f
Branch: refs/heads/ignite-3478-1
Commit: b7b9089f0102b8cab9942a9c887d93e9f26cc7d2
Parents: bc8e645
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 31 12:00:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 31 12:00:36 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 62 ++++----------------
.../discovery/GridDiscoveryManager.java | 38 ++----------
.../GridCachePartitionExchangeManager.java | 4 +-
.../processors/cache/GridCacheUtils.java | 53 ++---------------
.../dht/GridClientPartitionTopology.java | 8 +--
.../dht/GridDhtPartitionTopologyImpl.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 2 +-
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../near/GridNearTransactionalCache.java | 4 +-
.../cache/query/GridCacheQueryAdapter.java | 2 +-
.../cache/query/GridCacheQueryManager.java | 6 +-
.../service/GridServiceProcessor.java | 4 +-
.../CacheLateAffinityAssignmentTest.java | 2 +-
14 files changed, 40 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 5ac99f1..4b57eb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -58,13 +58,6 @@ public class DiscoCache {
/** Daemon nodes. */
private final List<ClusterNode> daemonNodes;
- /** All server nodes. */
- private final List<ClusterNode> srvNodesWithCaches;
-
- /** All nodes with at least one cache configured. */
- @GridToStringInclude
- private final List<ClusterNode> allNodesWithCaches;
-
/** All remote nodes with at least one cache configured. */
@GridToStringInclude
private final List<ClusterNode> rmtNodesWithCaches;
@@ -97,8 +90,6 @@ public class DiscoCache {
* @param allNodes All nodes.
* @param srvNodes Server nodes.
* @param daemonNodes Daemon nodes.
- * @param srvNodesWithCaches Server nodes with at least one cache configured.
- * @param allNodesWithCaches All nodes with at least one cache configured.
* @param rmtNodesWithCaches Remote nodes with at least one cache configured.
* @param allCacheNodes Cache nodes by cache name.
* @param cacheGrpAffNodes Affinity nodes by cache group ID.
@@ -113,8 +104,6 @@ public class DiscoCache {
List<ClusterNode> allNodes,
List<ClusterNode> srvNodes,
List<ClusterNode> daemonNodes,
- List<ClusterNode> srvNodesWithCaches,
- List<ClusterNode> allNodesWithCaches,
List<ClusterNode> rmtNodesWithCaches,
Map<Integer, List<ClusterNode>> allCacheNodes,
Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
@@ -127,8 +116,6 @@ public class DiscoCache {
this.allNodes = allNodes;
this.srvNodes = srvNodes;
this.daemonNodes = daemonNodes;
- this.srvNodesWithCaches = srvNodesWithCaches;
- this.allNodesWithCaches = allNodesWithCaches;
this.rmtNodesWithCaches = rmtNodesWithCaches;
this.allCacheNodes = allCacheNodes;
this.cacheGrpAffNodes = cacheGrpAffNodes;
@@ -195,36 +182,13 @@ public class DiscoCache {
return daemonNodes;
}
- /** @return Server nodes with at least one cache configured. */
- public List<ClusterNode> serverNodesWithCaches() {
- return srvNodesWithCaches;
- }
-
/**
- * Gets all remote nodes that have at least one cache configured.
+ * Gets all alive remote nodes that have at least one cache configured.
*
* @return Collection of nodes.
*/
- public List<ClusterNode> remoteNodesWithCaches() {
- return rmtNodesWithCaches;
- }
-
- /**
- * Gets collection of nodes with at least one cache configured.
- *
- * @return Collection of nodes.
- */
- public List<ClusterNode> allNodesWithCaches() {
- return allNodesWithCaches;
- }
-
- /**
- * Gets collection of server nodes with at least one cache configured.
- *
- * @return Collection of nodes.
- */
- public Collection<ClusterNode> aliveServerNodes() {
- return F.view(serverNodes(), new P1<ClusterNode>() {
+ public Collection<ClusterNode> remoteAliveNodesWithCaches() {
+ return F.view(rmtNodesWithCaches, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
return alives.contains(node.id());
}
@@ -236,8 +200,8 @@ public class DiscoCache {
*
* @return Collection of nodes.
*/
- public Collection<ClusterNode> aliveServerNodesWithCaches() {
- return F.view(serverNodesWithCaches(), new P1<ClusterNode>() {
+ public Collection<ClusterNode> aliveServerNodes() {
+ return F.view(serverNodes(), new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
return alives.contains(node.id());
}
@@ -248,16 +212,14 @@ public class DiscoCache {
* @return Oldest alive server node.
*/
public @Nullable ClusterNode oldestAliveServerNode(){
- Iterator<ClusterNode> it = aliveServerNodes().iterator();
- return it.hasNext() ? it.next() : null;
- }
+ for (int i = 0; i < srvNodes.size(); i++) {
+ ClusterNode srv = srvNodes.get(i);
- /**
- * @return Oldest alive server node with at least one cache configured.
- */
- public @Nullable ClusterNode oldestAliveServerNodeWithCache(){
- Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator();
- return it.hasNext() ? it.next() : null;
+ if (alives.contains(srv.id()))
+ return srv;
+ }
+
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d426ca5..8e3f9fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1805,42 +1805,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets cache nodes for cache with given ID.
- *
- * @param cacheId Cache ID.
- * @param topVer Topology version.
- * @return Collection of cache nodes.
- */
- public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId);
- }
-
- /**
- * Gets all nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of cache nodes.
- */
- public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches();
- }
-
- /**
* Gets cache remote nodes for cache with given name.
*
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(CU.cacheId(null), topVer).remoteNodesWithCaches();
+ public Collection<ClusterNode> remoteAliveNodesWithCaches(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(CU.cacheId(null), topVer).remoteAliveNodesWithCaches();
}
/**
* @param topVer Topology version (maximum allowed node order).
* @return Oldest alive server nodes with at least one cache configured.
*/
- @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(CU.cacheId(null), topVer).oldestAliveServerNodeWithCache();
+ @Nullable public ClusterNode oldestAliveServerNode(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(CU.cacheId(null), topVer).oldestAliveServerNode();
}
/**
@@ -2203,9 +2182,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size());
- Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
- Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
for (ClusterNode node : allNodes) {
assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
@@ -2230,11 +2207,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
CachePredicate filter = entry.getValue();
if (filter.cacheNode(node)) {
- allNodesWithCaches.add(node);
-
- if(!CU.clientNode(node))
- srvNodesWithCaches.add(node);
-
if (!node.isLocal())
rmtNodesWithCaches.add(node);
@@ -2251,8 +2223,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Collections.unmodifiableList(allNodes),
Collections.unmodifiableList(srvNodes),
Collections.unmodifiableList(daemonNodes),
- U.sealList(srvNodesWithCaches),
- U.sealList(allNodesWithCaches),
U.sealList(rmtNodesWithCaches),
Collections.unmodifiableMap(allCacheNodes),
Collections.unmodifiableMap(cacheGrpAffNodes),
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index bd34a5f..5ec6bfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -925,7 +925,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send
*/
private void refreshPartitions() {
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
+ ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
if (oldest == null) {
if (log.isDebugEnabled())
@@ -955,7 +955,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
AffinityTopologyVersion rmtTopVer =
lastFut != null ? (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) : AffinityTopologyVersion.NONE;
- Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer);
+ Collection<ClusterNode> rmts = cctx.discovery().remoteAliveNodesWithCaches(rmtTopVer);
if (log.isDebugEnabled())
log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index f94cfb5..5885fcd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -438,59 +438,14 @@ public class GridCacheUtils {
}
/**
- * Gets all nodes on which cache with the same name is started.
- *
- * @param ctx Cache context.
- * @param topOrder Maximum allowed node order.
- * @return All nodes on which cache with the same name is started (including nodes
- * that may have already left).
- */
- public static Collection<ClusterNode> allNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
- return ctx.discovery().cacheNodes(ctx.cacheId(), topOrder);
- }
-
- /**
- * Gets all nodes with at least one cache configured.
- *
- * @param ctx Shared cache context.
- * @param topOrder Maximum allowed node order.
- * @return All nodes on which cache with the same name is started (including nodes
- * that may have already left).
- */
- public static Collection<ClusterNode> allNodes(GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) {
- return ctx.discovery().cacheNodes(topOrder);
- }
-
- /**
- * Gets remote nodes with at least one cache configured.
- *
- * @param ctx Cache shared context.
- * @param topVer Topology version.
- * @return Collection of remote nodes with at least one cache configured.
- */
- public static Collection<ClusterNode> remoteNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
- return ctx.discovery().remoteCacheNodes(topVer);
- }
-
- /**
- * Gets all nodes on which cache with the same name is started and the local DHT storage is enabled.
- *
- * @param ctx Cache context.
- * @return All nodes on which cache with the same name is started.
- */
- public static Collection<ClusterNode> affinityNodes(final GridCacheContext ctx) {
- return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), AffinityTopologyVersion.NONE);
- }
-
- /**
* Gets DHT affinity nodes.
*
* @param ctx Cache context.
- * @param topOrder Maximum allowed node order.
- * @return Affinity nodes.
+ * @param topVer Topology version.
+ * @return Cache affinity nodes for given topology version.
*/
- public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
- return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topOrder);
+ public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topVer) {
+ return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 299394f..4ac9195 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -338,7 +338,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
assert oldest != null;
@@ -535,7 +535,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
* @return List of nodes for the partition.
*/
private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.allNodesWithCaches()) : null;
+ Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grpId));
lock.readLock().lock();
@@ -961,10 +961,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId.equals(cctx.localNodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
// If this node became the oldest node.
- if (oldest.id().equals(cctx.localNodeId())) {
+ if (cctx.localNode().equals(oldest)) {
long seq = node2part.updateSequence();
if (seq != updateSeq) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f7f71a1..01d972d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -326,7 +326,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (grp.affinityNode()) {
ClusterNode loc = ctx.localNode();
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -466,7 +466,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lastTopChangeVer = readyTopVer = evts.topologyVersion();
}
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
if (log.isDebugEnabled()) {
log.debug("Partition map beforeExchange [exchId=" + exchFut.exchangeId() +
@@ -2047,7 +2047,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private long updateLocal(int p, GridDhtPartitionState state, long updateSeq, AffinityTopologyVersion affVer) {
assert lock.isWriteLockedByCurrentThread();
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
assert oldest != null || ctx.kernalContext().clientNode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1467bfa..7d0f747 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1965,7 +1965,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (dhtFut != null) {
if (req.writeSynchronizationMode() == PRIMARY_SYNC
// To avoid deadlock disable back-pressure for sender data node.
- && !ctx.discovery().cacheAffinityNode(node, ctx.name())
+ && !ctx.discovery().cacheGroupAffinityNode(node, ctx.groupId())
&& !dhtFut.isDone()) {
final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index c72f53e..053bbe5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -712,7 +712,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
if (map == null) {
- Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer);
+ Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, topVer);
keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size());
@@ -815,7 +815,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
AffinityTopologyVersion topVer = lock.topologyVersion();
if (map == null) {
- Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer);
+ Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, topVer);
keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 240b5f0..6d85222 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2826,7 +2826,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
null);
}
else {
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
+ ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal()) {
GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a691cbc..973a199 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -540,7 +540,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
ver = cand.version();
if (map == null) {
- Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion());
+ Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, cand.topologyVersion());
if (F.isEmpty(affNodes))
return;
@@ -663,7 +663,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (cand != null) {
if (map == null) {
- Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion());
+ Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, cand.topologyVersion());
if (F.isEmpty(affNodes))
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index c4eae8c..b5fdd23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -600,7 +600,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
- Collection<ClusterNode> affNodes = CU.affinityNodes(cctx);
+ Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
if (prj == null && part == null)
return affNodes;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 3e27720..f873461 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1859,11 +1859,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
Callable<Collection<CacheSqlMetadata>> job = new MetadataJob();
// Remote nodes that have current cache.
- Collection<ClusterNode> nodes = F.view(cctx.discovery().remoteNodes(), new P1<ClusterNode>() {
- @Override public boolean apply(ClusterNode n) {
- return cctx.kernalContext().discovery().cacheAffinityNode(n, cacheName);
- }
- });
+ Collection<ClusterNode> nodes = CU.affinityNodes(cctx, AffinityTopologyVersion.NONE);
Collection<Collection<CacheSqlMetadata>> res = new ArrayList<>(nodes.size() + 1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 1d8720c..28b2983 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1292,7 +1292,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
if (!cache.context().affinityNode()) {
ClusterNode oldestSrvNode =
- ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
+ ctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
if (oldestSrvNode == null)
return new GridEmptyIterator<>();
@@ -1566,7 +1566,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
// In case the cache instance isn't tracked by DiscoveryManager anymore.
discoCache.updateAlives(ctx.discovery());
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b9089f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 695d8a6..95e9479 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2826,7 +2826,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
assertNotNull("Failed to find exchange future:", evt);
- Collection<ClusterNode> allNodes = ctx.discovery().cacheNodes(topVer0);
+ Collection<ClusterNode> allNodes = ctx.discovery().serverNodes(topVer0);
for (DynamicCacheDescriptor cacheDesc : ctx.cache().cacheDescriptors().values()) {
if (assignments.get(cacheDesc.cacheId()) != null)