You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/04/03 15:31:22 UTC
[1/3] ignite git commit: IGNITE-4779 Missed discovery data snapshot
during exchange processing (Backport from 2.0)
Repository: ignite
Updated Branches:
refs/heads/master d34a4d06d -> 889594f45
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7cf75fe..81f21a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -48,14 +48,14 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
@@ -173,35 +173,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
/** Discovery listener. */
- private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
+ private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
+ @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
if (!enterBusy())
return;
try {
- DiscoveryEvent e = (DiscoveryEvent)evt;
-
ClusterNode loc = cctx.localNode();
- assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED ||
- e.type() == EVT_DISCOVERY_CUSTOM_EVT;
+ assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
+ evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
- final ClusterNode n = e.eventNode();
+ final ClusterNode n = evt.eventNode();
GridDhtPartitionExchangeId exchId = null;
GridDhtPartitionsExchangeFuture exchFut = null;
- if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) {
+ if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
assert !loc.id().equals(n.id());
- if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
+ if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
assert cctx.discovery().node(n.id()) == null;
// Avoid race b/w initial future add and discovery event.
GridDhtPartitionsExchangeFuture initFut = null;
if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) {
- initFut = exchangeFuture(initialExchangeId(), null, null, null);
+ initFut = exchangeFuture(initialExchangeId(), null, null, null, null);
initFut.onNodeLeft(n);
}
@@ -213,18 +211,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
assert
- e.type() != EVT_NODE_JOINED || n.order() > loc.order() :
+ evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
"Node joined with smaller-than-local " +
"order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
exchId = exchangeId(n.id(),
- affinityTopologyVersion(e),
- e.type());
+ affinityTopologyVersion(evt),
+ evt.type());
- exchFut = exchangeFuture(exchId, e, null, null);
+ exchFut = exchangeFuture(exchId, evt, cache,null, null);
}
else {
- DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
+ DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
@@ -254,9 +252,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (!F.isEmpty(valid)) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, e, valid, null);
+ exchFut = exchangeFuture(exchId, evt, cache, valid, null);
}
}
else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
@@ -264,13 +262,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (msg.exchangeId() == null) {
if (msg.exchangeNeeded()) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, e, null, msg);
+ exchFut = exchangeFuture(exchId, evt, cache, null, msg);
}
}
else
- exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
}
}
@@ -279,7 +277,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
log.debug("Discovery event (will start exchange): " + exchId);
// Event callback - without this callback future will never complete.
- exchFut.onEvent(exchId, e);
+ exchFut.onEvent(exchId, evt, cache);
// Start exchange process.
addFuture(exchFut);
@@ -301,7 +299,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchWorker = new ExchangeWorker();
- cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
+ cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
EVT_DISCOVERY_CUSTOM_EVT);
cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class,
@@ -359,11 +357,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
assert startTime > 0;
// Generate dummy discovery event for local node joining.
- DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
+ T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin();
+
+ DiscoveryEvent discoEvt = localJoin.get1();
+ DiscoCache discoCache = localJoin.get2();
GridDhtPartitionExchangeId exchId = initialExchangeId();
- GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null, null);
+ GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null);
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
@@ -470,7 +471,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
- cctx.gridEvents().removeLocalEventListener(discoLsnr);
+ cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
@@ -1063,12 +1064,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
+ * @param cache Discovery data cache.
* @param reqs Cache change requests.
* @param affChangeMsg Affinity change message.
* @return Exchange future.
*/
private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@Nullable DiscoveryEvent discoEvt,
+ @Nullable DiscoCache cache,
@Nullable Collection<DynamicCacheChangeRequest> reqs,
@Nullable CacheAffinityChangeMessage affChangeMsg) {
GridDhtPartitionsExchangeFuture fut;
@@ -1087,7 +1090,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (discoEvt != null)
- fut.onEvent(exchId, discoEvt);
+ fut.onEvent(exchId, discoEvt, cache);
if (stopErr != null)
fut.onDone(stopErr);
@@ -1231,7 +1234,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
refreshPartitions();
}
else
- exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
}
finally {
leaveBusy();
@@ -1287,6 +1290,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
null,
null,
+ null,
null);
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@@ -1297,7 +1301,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
});
}
else
- exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 816132d..9366d0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -42,7 +43,6 @@ import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -103,6 +103,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** */
private final Object similarAffKey;
+ /** */
+ private volatile DiscoCache discoCache;
+
/**
* @param cctx Context.
* @param cacheId Cache ID.
@@ -121,6 +124,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
topVer = exchFut.topologyVersion();
+ discoCache = exchFut.discoCache();
+
log = cctx.logger(getClass());
lock.writeLock().lock();
@@ -191,6 +196,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
this.stopping = stopping;
topVer = exchId.topologyVersion();
+ discoCache = exchFut.discoCache();
updateSeq.setIfGreater(updSeq);
@@ -271,7 +277,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
removeNode(exchId.nodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
assert oldest != null;
@@ -424,7 +430,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (!F.isEmpty(nodeIds)) {
for (UUID nodeId : nodeIds) {
- ClusterNode n = cctx.discovery().node(nodeId);
+ ClusterNode n = discoCache.node(nodeId);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
if (nodes == null)
@@ -450,7 +456,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
* @return List of nodes for the partition.
*/
private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null;
+ Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.allNodesWithCaches()) : null;
lock.readLock().lock();
@@ -473,7 +479,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
continue;
if (hasState(p, id, state, states)) {
- ClusterNode n = cctx.discovery().node(id);
+ ClusterNode n = discoCache.node(id);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
nodes.add(n);
@@ -766,7 +772,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId.equals(cctx.localNodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
// If this node became the oldest node.
if (oldest.id().equals(cctx.localNodeId())) {
@@ -816,7 +822,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId != null;
assert lock.writeLock().isHeldByCurrentThread();
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
ClusterNode loc = cctx.localNode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..2c3d7ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -72,16 +73,18 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
* @param ctx Context.
* @param cacheName Cache name.
* @param topVer Topology version.
+ * @param discoCache Discovery cache.
*/
public GridDhtAssignmentFetchFuture(
GridCacheSharedContext ctx,
String cacheName,
- AffinityTopologyVersion topVer
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache
) {
this.ctx = ctx;
this.key = new T2<>(CU.cacheId(cacheName), topVer);
- Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer);
+ Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(CU.cacheId(cacheName));
LinkedList<ClusterNode> tmp = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1b4dcc9..5c3fba0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -95,6 +96,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+ /** Discovery cache. */
+ private volatile DiscoCache discoCache;
+
/** */
private volatile boolean stopping;
@@ -151,6 +155,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
rebalancedTopVer = AffinityTopologyVersion.NONE;
topVer = AffinityTopologyVersion.NONE;
+
+ discoCache = cctx.discovery().discoCache();
}
finally {
lock.writeLock().unlock();
@@ -293,6 +299,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
rebalancedTopVer = AffinityTopologyVersion.NONE;
topVer = exchId.topologyVersion();
+
+ discoCache = exchFut.discoCache();
}
finally {
lock.writeLock().unlock();
@@ -356,7 +364,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
ClusterNode loc = cctx.localNode();
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -481,7 +489,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (exchId.isLeft())
removeNode(exchId.nodeId());
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (log.isDebugEnabled())
log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -882,7 +890,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
AffinityTopologyVersion topVer,
GridDhtPartitionState state,
GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
+ Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null;
lock.readLock().lock();
@@ -979,7 +987,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -1112,7 +1120,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
@Nullable Map<Integer, Long> cntrMap,
boolean checkEvictions) {
@@ -1284,7 +1292,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
List<ClusterNode> affNodes = aff.get(p);
if (!affNodes.contains(cctx.localNode())) {
- Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING));
+ List<ClusterNode> nodes = nodes(p, topVer, OWNING);
+ Collection<UUID> nodeIds = F.nodeIds(nodes);
// If all affinity nodes are owners, then evict partition from local node.
if (nodeIds.containsAll(F.nodeIds(affNodes))) {
@@ -1302,15 +1311,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int affCnt = affNodes.size();
if (ownerCnt > affCnt) {
- List<ClusterNode> sorted = new ArrayList<>(cctx.discovery().nodes(nodeIds));
-
// Sort by node orders in ascending order.
- Collections.sort(sorted, CU.nodeComparator(true));
+ Collections.sort(nodes, CU.nodeComparator(true));
- int diff = sorted.size() - affCnt;
+ int diff = nodes.size() - affCnt;
for (int i = 0; i < diff; i++) {
- ClusterNode n = sorted.get(i);
+ ClusterNode n = nodes.get(i);
if (locId.equals(n.id())) {
part.rent(false);
@@ -1336,17 +1343,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
- * @return Current coordinator node.
- */
- @Nullable private ClusterNode currentCoordinator() {
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
-
- assert oldest != null || cctx.kernalContext().clientNode();
-
- return oldest;
- }
-
- /**
* Updates value for single partition.
*
* @param p Partition.
@@ -1356,7 +1352,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
assert oldest != null || cctx.kernalContext().clientNode();
@@ -1421,7 +1417,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private void removeNode(UUID nodeId) {
assert nodeId != null;
- ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer));
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
assert oldest != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 44780f1..c91f881 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2441,7 +2441,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = req.topologyVersion();
- boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
boolean readersOnly = false;
@@ -2676,7 +2676,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = req.topologyVersion();
- boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
CacheStorePartialUpdateException storeErr = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e945de9..7b5d09b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,7 +44,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -103,6 +103,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Dummy reassign flag. */
private final boolean reassign;
+ /** */
+ @GridToStringExclude
+ private volatile DiscoCache discoCache;
+
/** Discovery event. */
private volatile DiscoveryEvent discoEvt;
@@ -147,9 +151,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** */
private boolean init;
- /** Topology snapshot. */
- private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
-
/** Last committed cache version before next topology version use. */
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
@@ -332,6 +333,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @return Discovery cache.
+ */
+ public DiscoCache discoCache() {
+ return discoCache;
+ }
+
+ /**
* @param cacheId Cache ID to check.
* @param topVer Topology version.
* @return {@code True} if cache was added during this exchange.
@@ -374,11 +382,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
+ * @param discoCache Discovery data cache.
*/
- public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt) {
+ public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) {
assert exchId.equals(this.exchId);
this.discoEvt = discoEvt;
+ this.discoCache = discoCache;
evtLatch.countDown();
}
@@ -435,7 +445,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert !dummy && !forcePreload : this;
try {
- srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
+ discoCache.updateAlives(cctx.discovery());
+
+ srvNodes = new ArrayList<>(discoCache.serverNodes());
remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@ -550,7 +562,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
if (updateTop && clientTop != null)
- cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
+ top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
}
top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
@@ -842,7 +854,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
List<String> cachesWithoutNodes = null;
for (String name : cctx.cache().cacheNames()) {
- if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
+ if (discoCache.cacheAffinityNodes(name).isEmpty()) {
if (cachesWithoutNodes == null)
cachesWithoutNodes = new ArrayList<>();
@@ -1096,7 +1108,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* Cleans up resources to avoid excessive memory usage.
*/
public void cleanUp() {
- topSnapshot.set(null);
singleMsgs.clear();
fullMsgs.clear();
crd = null;
@@ -1252,7 +1263,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
try {
assert crd.isLocal();
- if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
+ if (!crd.equals(discoCache.serverNodes().get(0))) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal())
cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff);
@@ -1559,6 +1570,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
ClusterNode crd0;
+ discoCache.updateAlives(node);
+
synchronized (mux) {
if (!srvNodes.remove(node))
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d26242d..99146aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -48,7 +48,6 @@ import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridClosureCallMode;
import org.apache.ignite.internal.GridKernalContext;
@@ -58,8 +57,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -167,7 +167,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
private IgniteInternalCache<Object, Object> cache;
/** Topology listener. */
- private GridLocalEventListener topLsnr = new TopologyListener();
+ private DiscoveryEventListener topLsnr = new TopologyListener();
static {
Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() {
@@ -251,7 +251,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
cache = ctx.cache().utilityCache();
if (!ctx.clientNode())
- ctx.event().addLocalEventListener(topLsnr, EVTS);
+ ctx.event().addDiscoveryEventListener(topLsnr, EVTS);
try {
if (ctx.deploy().enabled())
@@ -314,7 +314,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
busyLock.block();
if (!ctx.clientNode())
- ctx.event().removeLocalEventListener(topLsnr);
+ ctx.event().removeDiscoveryEventListener(topLsnr);
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
@@ -1568,9 +1568,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/**
* Topology listener.
*/
- private class TopologyListener implements GridLocalEventListener {
+ private class TopologyListener implements DiscoveryEventListener {
/** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
+ @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) {
if (!busyLock.enterBusy())
return;
@@ -1588,11 +1588,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
else
- topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+ topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
depExe.execute(new BusyRunnable() {
@Override public void run0() {
- ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+ // In case the cache instance isn't tracked by DiscoveryManager anymore.
+ discoCache.updateAlives(ctx.discovery());
+
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
index 31b4bc7..f0c50eb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
@@ -23,16 +23,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgnitePredicate;
@@ -90,8 +85,8 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
}
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration cCfg = defaultCacheConfiguration();
@@ -103,7 +98,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
TcpDiscoverySpi disc = new TcpDiscoverySpi();
- if (clientMode && ((gridName.charAt(gridName.length() - 1) - '0') & 1) != 0)
+ if (clientMode && ((igniteInstanceName.charAt(igniteInstanceName.length() - 1) - '0') & 1) != 0)
cfg.setClientMode(true);
else
disc.setMaxMissedClientHeartbeats(50);
@@ -158,8 +153,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
stopTempNodes();
latch.await();
-
- validateAlives();
}
}
@@ -200,55 +193,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
}
/**
- * Validates that all node collections contain actual information.
- */
- @SuppressWarnings("SuspiciousMethodCalls")
- private void validateAlives() {
- for (Ignite g : alive) {
- log.info("Validate node: " + g.name());
-
- assertEquals("Unexpected nodes number for node: " + g.name(), PERM_NODES_CNT, g.cluster().nodes().size());
- }
-
- for (final Ignite g : alive) {
- IgniteKernal k = (IgniteKernal)g;
-
- GridDiscoveryManager discoMgr = k.context().discovery();
-
- final Collection<ClusterNode> currTop = g.cluster().nodes();
-
- long currVer = discoMgr.topologyVersion();
-
- long startVer = discoMgr.localNode().order();
-
- for (long v = currVer; v > currVer - GridDiscoveryManager.DISCOVERY_HISTORY_SIZE && v >= startVer; v--) {
- F.forAll(discoMgr.aliveCacheNodes(null, new AffinityTopologyVersion(v)),
- new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode e) {
- return currTop.contains(e);
- }
- });
-
- F.forAll(discoMgr.aliveRemoteCacheNodes(null, new AffinityTopologyVersion(v)),
- new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode e) {
- return currTop.contains(e) || g.cluster().localNode().equals(e);
- }
- });
-
- GridCacheSharedContext<?, ?> ctx = k.context().cache().context();
-
- ClusterNode oldest =
- ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer));
-
- assertNotNull(oldest);
-
- assertTrue(currTop.contains(oldest));
- }
- }
- }
-
- /**
* Starts temporary nodes.
*
* @throws Exception If failed.
@@ -293,4 +237,4 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
G.stop(g.name(), false);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
index ba8fa5b..5de2910 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
@@ -54,10 +54,10 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
private static boolean binaryMarshallerEnabled;
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- if (gridName.equals(getTestGridName(1)))
+ if (igniteInstanceName.equals(getTestGridName(1)))
cfg.setClientMode(true);
if (binaryMarshallerEnabled)
@@ -160,7 +160,7 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
if (fail)
fail("Node should not join");
}
- catch (Exception e) {
+ catch (Exception ignored) {
if (!fail)
fail("Node should join");
}
@@ -215,7 +215,7 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
if (fail)
fail("Node should not join");
}
- catch (Exception e) {
+ catch (Exception ignored) {
if (!fail)
fail("Node should join");
}
@@ -346,8 +346,8 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
*/
public static class RegularDiscovery extends GridDiscoveryManagerAttributesSelfTest {
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
deleted file mode 100644
index c9179d4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.discovery;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-
-/**
- *
- */
-public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTest {
- /** */
- private static final String CACHE_NAME = "cache";
-
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("IfMayBeConditional")
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- CacheConfiguration ccfg1 = defaultCacheConfiguration();
-
- ccfg1.setName(CACHE_NAME);
-
- CacheConfiguration ccfg2 = defaultCacheConfiguration();
-
- ccfg2.setName(null);
-
- if (gridName.equals(getTestGridName(1)))
- cfg.setClientMode(true);
- else {
- ccfg1.setNearConfiguration(null);
- ccfg2.setNearConfiguration(null);
-
- ccfg1.setCacheMode(PARTITIONED);
- ccfg2.setCacheMode(PARTITIONED);
-
- cfg.setCacheConfiguration(ccfg1, ccfg2);
- }
-
- TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
-
- discoverySpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoverySpi);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testHasNearCache() throws Exception {
- IgniteKernal g0 = (IgniteKernal)startGrid(0); // PARTITIONED_ONLY cache.
-
- AffinityTopologyVersion none = new AffinityTopologyVersion(-1);
- AffinityTopologyVersion one = new AffinityTopologyVersion(1);
- AffinityTopologyVersion two = new AffinityTopologyVersion(2, 2);
- AffinityTopologyVersion three = new AffinityTopologyVersion(3);
- AffinityTopologyVersion four = new AffinityTopologyVersion(4);
- AffinityTopologyVersion five = new AffinityTopologyVersion(5);
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, none));
- assertFalse(g0.context().discovery().hasNearCache(null, none));
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
-
- IgniteKernal g1 = (IgniteKernal)startGrid(1); // NEAR_ONLY cache.
-
- grid(1).createNearCache(null, new NearCacheConfiguration());
-
- grid(1).createNearCache(CACHE_NAME, new NearCacheConfiguration());
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g1.context().discovery().hasNearCache(null, two));
-
- IgniteKernal g2 = (IgniteKernal)startGrid(2); // PARTITIONED_ONLY cache.
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g1.context().discovery().hasNearCache(null, two));
- assertTrue(g1.context().discovery().hasNearCache(null, three));
-
- assertTrue(g2.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g2.context().discovery().hasNearCache(null, three));
-
- stopGrid(2);
-
- // Wait all nodes are on version 4.
- for (;;) {
- if (F.forAll(
- Ignition.allGrids(),
- new IgnitePredicate<Ignite>() {
- @Override public boolean apply(Ignite ignite) {
- return ignite.cluster().topologyVersion() == 4;
- }
- }))
- break;
-
- Thread.sleep(1000);
- }
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
- assertTrue(g0.context().discovery().hasNearCache(null, four));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, four));
- assertTrue(g1.context().discovery().hasNearCache(null, three));
- assertTrue(g1.context().discovery().hasNearCache(null, four));
-
- stopGrid(1);
-
- // Wait all nodes are on version 5.
- for (;;) {
- if (F.forAll(
- Ignition.allGrids(),
- new IgnitePredicate<Ignite>() {
- @Override public boolean apply(Ignite ignite) {
- return ignite.cluster().topologyVersion() == 5;
- }
- }))
- break;
-
- Thread.sleep(1000);
- }
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four));
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, five));
-
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
- assertTrue(g0.context().discovery().hasNearCache(null, four));
- assertFalse(g0.context().discovery().hasNearCache(null, five));
- }
-
- /**
- *
- */
- public static class RegularDiscovery extends GridDiscoveryManagerSelfTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
-
- return cfg;
- }
- }
-
- /**
- *
- */
- public static class ClientDiscovery extends GridDiscoveryManagerSelfTest {
- // No-op.
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
index 58992af..86ad458 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
@@ -50,16 +50,16 @@ public class IgniteTopologyPrintFormatSelfTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpDiscoverySpi disc = new TcpDiscoverySpi();
disc.setIpFinder(IP_FINDER);
- if (gridName.endsWith("client"))
+ if (igniteInstanceName.endsWith("client"))
cfg.setClientMode(true);
- if (gridName.endsWith("client_force_server")) {
+ if (igniteInstanceName.endsWith("client_force_server")) {
cfg.setClientMode(true);
disc.setForceServerMode(true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index b28619c..985dddb 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.managers.communication.GridCommunicationSendMe
import org.apache.ignite.internal.managers.deployment.GridDeploymentManagerStopSelfTest;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAliveCacheSelfTest;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAttributesSelfTest;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerSelfTest;
import org.apache.ignite.internal.managers.discovery.IgniteTopologyPrintFormatSelfTest;
import org.apache.ignite.internal.managers.events.GridEventStorageManagerSelfTest;
import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManagerSelfTest;
@@ -111,8 +110,6 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
suite.addTestSuite(GridDiscoveryManagerAttributesSelfTest.RegularDiscovery.class);
suite.addTestSuite(GridDiscoveryManagerAttributesSelfTest.ClientDiscovery.class);
suite.addTestSuite(GridDiscoveryManagerAliveCacheSelfTest.class);
- suite.addTestSuite(GridDiscoveryManagerSelfTest.RegularDiscovery.class);
- suite.addTestSuite(GridDiscoveryManagerSelfTest.ClientDiscovery.class);
suite.addTestSuite(GridDiscoveryEventSelfTest.class);
suite.addTestSuite(GridPortProcessorSelfTest.class);
suite.addTestSuite(GridHomePathSelfTest.class);
[2/3] ignite git commit: IGNITE-4779 Missed discovery data snapshot
during exchange processing (Backport from 2.0)
Posted by av...@apache.org.
IGNITE-4779 Missed discovery data snapshot during exchange processing (Backport from 2.0)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8273e670
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8273e670
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8273e670
Branch: refs/heads/master
Commit: 8273e6703944ea50b229a512398ac741eb713073
Parents: 2263283
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Mar 21 18:04:01 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Mar 21 18:04:01 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 310 ++++++++
.../discovery/GridDiscoveryManager.java | 710 ++++++-------------
.../eventstorage/DiscoveryEventListener.java | 33 +
.../eventstorage/GridEventStorageManager.java | 162 ++++-
.../affinity/GridAffinityAssignmentCache.java | 7 +-
.../cache/CacheAffinitySharedManager.java | 35 +-
.../cache/GridCacheAffinityManager.java | 2 +-
.../GridCachePartitionExchangeManager.java | 64 +-
.../dht/GridClientPartitionTopology.java | 20 +-
.../dht/GridDhtAssignmentFetchFuture.java | 7 +-
.../dht/GridDhtPartitionTopologyImpl.java | 44 +-
.../dht/atomic/GridDhtAtomicCache.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 33 +-
.../service/GridServiceProcessor.java | 21 +-
.../GridDiscoveryManagerAliveCacheSelfTest.java | 64 +-
.../GridDiscoveryManagerAttributesSelfTest.java | 14 +-
.../discovery/GridDiscoveryManagerSelfTest.java | 214 ------
.../IgniteTopologyPrintFormatSelfTest.java | 8 +-
.../testsuites/IgniteKernalSelfTestSuite.java | 3 -
19 files changed, 854 insertions(+), 901 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
new file mode 100644
index 0000000..5247ac1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class DiscoCache {
+ /** Local node. */
+ private final ClusterNode loc;
+
+ /** Remote nodes. */
+ private final List<ClusterNode> rmtNodes;
+
+ /** All nodes. */
+ private final List<ClusterNode> allNodes;
+
+ /** All server nodes. */
+ private final List<ClusterNode> srvNodes;
+
+ /** Daemon nodes. */
+ private final List<ClusterNode> daemonNodes;
+
+ /** All server nodes. */
+ private final List<ClusterNode> srvNodesWithCaches;
+
+ /** All nodes with at least one cache configured. */
+ @GridToStringInclude
+ private final List<ClusterNode> allNodesWithCaches;
+
+ /** All remote nodes with at least one cache configured. */
+ @GridToStringInclude
+ private final List<ClusterNode> rmtNodesWithCaches;
+
+ /** Cache nodes by cache name. */
+ @GridToStringInclude
+ private final Map<Integer, List<ClusterNode>> allCacheNodes;
+
+ /** Affinity cache nodes by cache name. */
+ @GridToStringInclude
+ private final Map<Integer, List<ClusterNode>> affCacheNodes;
+
+ /** Node map. */
+ private final Map<UUID, ClusterNode> nodeMap;
+
+ /** Caches where at least one node has near cache enabled. */
+ @GridToStringInclude
+ private final Set<Integer> nearEnabledCaches;
+
+ /** Alive nodes. */
+ private final Set<UUID> alives = new GridConcurrentHashSet<>();
+
+ /**
+ * @param loc Local node.
+ * @param rmtNodes Remote nodes.
+ * @param allNodes All nodes.
+ * @param srvNodes Server nodes.
+ * @param daemonNodes Daemon nodes.
+ * @param srvNodesWithCaches Server nodes with at least one cache configured.
+ * @param allNodesWithCaches All nodes with at least one cache configured.
+ * @param rmtNodesWithCaches Remote nodes with at least one cache configured.
+ * @param allCacheNodes Cache nodes by cache name.
+ * @param affCacheNodes Affinity cache nodes by cache name.
+ * @param nodeMap Node map.
+ * @param nearEnabledCaches Caches where at least one node has near cache enabled.
+ * @param alives Alive nodes.
+ */
+ DiscoCache(ClusterNode loc,
+ List<ClusterNode> rmtNodes,
+ List<ClusterNode> allNodes,
+ List<ClusterNode> srvNodes,
+ List<ClusterNode> daemonNodes,
+ List<ClusterNode> srvNodesWithCaches,
+ List<ClusterNode> allNodesWithCaches,
+ List<ClusterNode> rmtNodesWithCaches,
+ Map<Integer, List<ClusterNode>> allCacheNodes,
+ Map<Integer, List<ClusterNode>> affCacheNodes,
+ Map<UUID, ClusterNode> nodeMap,
+ Set<Integer> nearEnabledCaches,
+ Set<UUID> alives) {
+ this.loc = loc;
+ this.rmtNodes = rmtNodes;
+ this.allNodes = allNodes;
+ this.srvNodes = srvNodes;
+ this.daemonNodes = daemonNodes;
+ this.srvNodesWithCaches = srvNodesWithCaches;
+ this.allNodesWithCaches = allNodesWithCaches;
+ this.rmtNodesWithCaches = rmtNodesWithCaches;
+ this.allCacheNodes = allCacheNodes;
+ this.affCacheNodes = affCacheNodes;
+ this.nodeMap = nodeMap;
+ this.nearEnabledCaches = nearEnabledCaches;
+ this.alives.addAll(alives);
+ }
+
+ /** @return Local node. */
+ public ClusterNode localNode() {
+ return loc;
+ }
+
+ /** @return Remote nodes. */
+ public List<ClusterNode> remoteNodes() {
+ return rmtNodes;
+ }
+
+ /** @return All nodes. */
+ public List<ClusterNode> allNodes() {
+ return allNodes;
+ }
+
+ /** @return Server nodes. */
+ public List<ClusterNode> serverNodes() {
+ return srvNodes;
+ }
+
+ /** @return Daemon nodes. */
+ public List<ClusterNode> daemonNodes() {
+ return daemonNodes;
+ }
+
+ /** @return Server nodes with at least one cache configured. */
+ public List<ClusterNode> serverNodesWithCaches() {
+ return srvNodesWithCaches;
+ }
+
+ /**
+ * Gets all remote nodes that have at least one cache configured.
+ *
+ * @return Collection of nodes.
+ */
+ public List<ClusterNode> remoteNodesWithCaches() {
+ return rmtNodesWithCaches;
+ }
+
+ /**
+ * Gets collection of nodes with at least one cache configured.
+ *
+ * @return Collection of nodes.
+ */
+ public List<ClusterNode> allNodesWithCaches() {
+ return allNodesWithCaches;
+ }
+
+ /**
+ * Gets collection of server nodes with at least one cache configured.
+ *
+ * @return Collection of nodes.
+ */
+ public Collection<ClusterNode> aliveServerNodes() {
+ return F.view(serverNodes(), new P1<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return alives.contains(node.id());
+ }
+ });
+ }
+
+ /**
+ * Gets collection of server nodes with at least one cache configured.
+ *
+ * @return Collection of nodes.
+ */
+ public Collection<ClusterNode> aliveServerNodesWithCaches() {
+ return F.view(serverNodesWithCaches(), new P1<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return alives.contains(node.id());
+ }
+ });
+ }
+
+ /**
+ * @return Oldest alive server node.
+ */
+ public @Nullable ClusterNode oldestAliveServerNode(){
+ Iterator<ClusterNode> it = aliveServerNodes().iterator();
+ return it.hasNext() ? it.next() : null;
+ }
+
+ /**
+ * @return Oldest alive server node with at least one cache configured.
+ */
+ public @Nullable ClusterNode oldestAliveServerNodeWithCache(){
+ Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator();
+ return it.hasNext() ? it.next() : null;
+ }
+
+ /**
+ * Gets all nodes that have cache with given name.
+ *
+ * @param cacheName Cache name.
+ * @return Collection of nodes.
+ */
+ public List<ClusterNode> cacheNodes(@Nullable String cacheName) {
+ return cacheNodes(CU.cacheId(cacheName));
+ }
+
+ /**
+ * Gets all nodes that have cache with given ID.
+ *
+ * @param cacheId Cache ID.
+ * @return Collection of nodes.
+ */
+ public List<ClusterNode> cacheNodes(Integer cacheId) {
+ return emptyIfNull(allCacheNodes.get(cacheId));
+ }
+
+ /**
+ * Gets all nodes that have cache with given ID and should participate in affinity calculation. With
+ * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
+ *
+ * @param cacheName Cache name.
+ * @return Collection of nodes.
+ */
+ public List<ClusterNode> cacheAffinityNodes(@Nullable String cacheName) {
+ return cacheAffinityNodes(CU.cacheId(cacheName));
+ }
+
+ /**
+ * Gets all nodes that have cache with given ID and should participate in affinity calculation. With
+ * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
+ *
+ * @param cacheId Cache ID.
+ * @return Collection of nodes.
+ */
+ public List<ClusterNode> cacheAffinityNodes(int cacheId) {
+ return emptyIfNull(affCacheNodes.get(cacheId));
+ }
+
+ /**
+ * Checks if cache with given ID has at least one node with near cache enabled.
+ *
+ * @param cacheId Cache ID.
+ * @return {@code True} if cache with given name has at least one node with near cache enabled.
+ */
+ public boolean hasNearCache(int cacheId) {
+ return nearEnabledCaches.contains(cacheId);
+ }
+
+ /**
+ * @param id Node ID.
+ * @return Node.
+ */
+ public @Nullable ClusterNode node(UUID id) {
+ return nodeMap.get(id);
+ }
+
+ /**
+ * Removes left node from alives lists.
+ *
+ * @param rmvd Removed node.
+ */
+ public void updateAlives(ClusterNode rmvd) {
+ alives.remove(rmvd.id());
+ }
+
+ /**
+ * Removes left nodes from cached alives lists.
+ *
+ * @param discovery Discovery manager.
+ */
+ public void updateAlives(GridDiscoveryManager discovery) {
+ for (UUID alive : alives) {
+ if (!discovery.alive(alive))
+ alives.remove(alive);
+ }
+ }
+
+ /**
+ * @param nodes Cluster nodes.
+ * @return Empty collection if nodes list is {@code null}
+ */
+ private List<ClusterNode> emptyIfNull(List<ClusterNode> nodes) {
+ return nodes == null ? Collections.<ClusterNode>emptyList() : nodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DiscoCache.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..80549dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -35,14 +35,12 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.Set;
-import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -74,24 +72,23 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridTuple5;
+import org.apache.ignite.internal.util.lang.GridTuple6;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -99,7 +96,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
@@ -113,6 +109,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -233,7 +230,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private long segChkFreq;
/** Local node join to topology event. */
- private GridFutureAdapter<DiscoveryEvent> locJoinEvt = new GridFutureAdapter<>();
+ private GridFutureAdapter<T2<DiscoveryEvent, DiscoCache>> locJoin = new GridFutureAdapter<>();
/** GC CPU load. */
private volatile double gcCpuLoad;
@@ -542,20 +539,25 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
+ final DiscoCache discoCache;
+
// Put topology snapshot into discovery history.
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
if (verChanged) {
- DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id())));
+ discoCache = createDiscoCache(locNode, topSnapshot);
- discoCacheHist.put(nextTopVer, cache);
+ discoCacheHist.put(nextTopVer, discoCache);
- boolean set = updateTopologyVersionIfGreater(nextTopVer, cache);
+ boolean set = updateTopologyVersionIfGreater(nextTopVer, discoCache);
assert set || topVer == 0 : "Topology version has not been updated [this.topVer=" +
topSnap + ", topVer=" + topVer + ", node=" + node +
", evt=" + U.gridEventName(type) + ']';
}
+ else
+ // Current version.
+ discoCache = discoCache();
// If this is a local join event, just save it and do not notify listeners.
if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) {
@@ -563,7 +565,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
gridStartTime = getSpi().getGridStartTime();
updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()),
- new DiscoCache(localNode(), F.view(topSnapshot, F.remoteNodes(locNode.id()))));
+ discoCache);
startLatch.countDown();
@@ -573,14 +575,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
discoEvt.eventNode(node);
discoEvt.type(EVT_NODE_JOINED);
- discoEvt.topologySnapshot(topVer, new ArrayList<>(
- F.viewReadOnly(topSnapshot, new C1<ClusterNode, ClusterNode>() {
- @Override public ClusterNode apply(ClusterNode e) {
- return e;
- }
- }, FILTER_DAEMON)));
+ discoEvt.topologySnapshot(topVer, new ArrayList<>(F.view(topSnapshot, FILTER_DAEMON)));
- locJoinEvt.onDone(discoEvt);
+ locJoin.onDone(new T2<>(discoEvt, discoCache));
return;
}
@@ -595,7 +592,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
((IgniteKernal)ctx.grid()).onDisconnected();
- locJoinEvt = new GridFutureAdapter<>();
+ locJoin = new GridFutureAdapter<>();
registeredCaches.clear();
@@ -608,7 +605,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
topHist.clear();
topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
- new DiscoCache(locNode, Collections.<ClusterNode>emptySet())));
+ createDiscoCache(locNode, Collections.<ClusterNode>emptySet())));
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
@@ -625,7 +622,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
try {
fut.get();
- discoWrk.addEvent(type, nextTopVer, node, topSnapshot, null);
+ discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, null);
}
catch (IgniteException ignore) {
// No-op.
@@ -637,7 +634,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected())
- discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
+ discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, customMsg);
}
});
@@ -1333,8 +1330,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
U.join(segChkThread, log);
}
- if (!locJoinEvt.isDone())
- locJoinEvt.onDone(
+ if (!locJoin.isDone())
+ locJoin.onDone(
new IgniteCheckedException("Failed to wait for local node joined event (grid is stopping)."));
}
@@ -1528,7 +1525,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*
* @return Discovery collection cache.
*/
- private DiscoCache discoCache() {
+ public DiscoCache discoCache() {
Snapshot cur = topSnap.get();
assert cur != null;
@@ -1577,7 +1574,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> nodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).allNodes();
+ return resolveDiscoCache(CU.cacheId(null), topVer).allNodes();
}
/**
@@ -1585,7 +1582,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return All server nodes for given topology version.
*/
public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).srvNodes;
+ return resolveDiscoCache(CU.cacheId(null), topVer).serverNodes();
}
/**
@@ -1596,7 +1593,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Node.
*/
public ClusterNode node(AffinityTopologyVersion topVer, UUID id) {
- return resolveDiscoCache(null, topVer).node(id);
+ return resolveDiscoCache(CU.cacheId(null), topVer).node(id);
}
/**
@@ -1607,49 +1604,38 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).cacheNodes(cacheName, topVer.topologyVersion());
+ return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName);
}
/**
- * Gets all nodes with at least one cache configured.
+ * Gets cache nodes for cache with given ID.
*
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).allNodesWithCaches(topVer.topologyVersion());
+ public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId);
}
/**
- * Gets cache remote nodes for cache with given name.
- *
- * @param topVer Topology version.
- * @return Collection of cache nodes.
- */
- public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).remoteCacheNodes(topVer.topologyVersion());
- }
-
- /**
- * Gets cache nodes for cache with given name.
+ * Gets all nodes with at least one cache configured.
*
- * @param cacheName Cache name.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion());
+ public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches();
}
/**
* Gets cache remote nodes for cache with given name.
*
- * @param cacheName Cache name.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion());
+ public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(CU.cacheId(null), topVer).remoteNodesWithCaches();
}
/**
@@ -1657,11 +1643,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Oldest alive server nodes with at least one cache configured.
*/
@Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
- DiscoCache cache = resolveDiscoCache(null, topVer);
-
- Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
-
- return e != null ? e.getKey() : null;
+ return resolveDiscoCache(CU.cacheId(null), topVer).oldestAliveServerNodeWithCache();
}
/**
@@ -1672,7 +1654,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache affinity nodes.
*/
public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).cacheAffinityNodes(cacheName, topVer.topologyVersion());
+ int cacheId = CU.cacheId(cacheName);
+
+ return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId);
+ }
+
+ /**
+ * Gets cache nodes for cache with given ID that participate in affinity calculation.
+ *
+ * @param cacheId Cache ID.
+ * @param topVer Topology version.
+ * @return Collection of cache affinity nodes.
+ */
+ public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId);
}
/**
@@ -1742,31 +1737,34 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Checks if cache with given name has at least one node with near cache enabled.
+ * Checks if cache with given ID has at least one node with near cache enabled.
*
- * @param cacheName Cache name.
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return {@code True} if cache with given name has at least one node with near cache enabled.
*/
- public boolean hasNearCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).hasNearCache(cacheName);
+ public boolean hasNearCache(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(cacheId, topVer).hasNearCache(cacheId);
}
/**
* Gets discovery cache for given topology version.
*
- * @param cacheName Cache name (participates in exception message).
+ * @param cacheId Cache ID (participates in exception message).
* @param topVer Topology version.
* @return Discovery cache.
*/
- private DiscoCache resolveDiscoCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+ private DiscoCache resolveDiscoCache(int cacheId, AffinityTopologyVersion topVer) {
Snapshot snap = topSnap.get();
DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ?
snap.discoCache : discoCacheHist.get(topVer);
if (cache == null) {
- throw new IgniteException("Failed to resolve nodes topology [cacheName=" + cacheName +
+ DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
+
+ throw new IgniteException("Failed to resolve nodes topology [" +
+ "cacheName=" + (desc != null ? desc.cacheConfiguration().getName() : "N/A") +
", topVer=" + topVer +
", history=" + discoCacheHist.keySet() +
", snap=" + snap +
@@ -1817,7 +1815,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** @return Event that represents a local node joined to topology. */
public DiscoveryEvent localJoinEvent() {
try {
- return locJoinEvt.get();
+ return locJoin.get().get1();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @return Tuple that consists of a local join event and discovery cache at the join time.
+ */
+ public T2<DiscoveryEvent, DiscoCache> localJoin() {
+ try {
+ return locJoin.get();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -1891,6 +1901,114 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @param loc Local node.
+ * @param topSnapshot Topology snapshot.
+ * @return Newly created discovery cache.
+ */
+ @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
+ HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
+ HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
+
+ ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+
+ for (ClusterNode node : topSnapshot) {
+ if (alive(node))
+ alives.add(node.id());
+
+ if (node.isDaemon())
+ daemonNodes.add(node);
+ else {
+ allNodes.add(node);
+
+ if (!node.isLocal())
+ rmtNodes.add(node);
+
+ if (!CU.clientNode(node))
+ srvNodes.add(node);
+ }
+
+ nodeMap.put(node.id(), node);
+ }
+
+ assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
+ " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
+
+ Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
+ Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
+
+ Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+
+ Set<Integer> nearEnabledCaches = new HashSet<>();
+
+ for (ClusterNode node : allNodes) {
+ assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
+ assert !node.isDaemon();
+
+ for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+ String cacheName = entry.getKey();
+ CachePredicate filter = entry.getValue();
+
+ if (filter.cacheNode(node)) {
+ allNodesWithCaches.add(node);
+
+ if(!CU.clientNode(node))
+ srvNodesWithCaches.add(node);
+
+ if (!node.isLocal())
+ rmtNodesWithCaches.add(node);
+
+ addToMap(allCacheNodes, cacheName, node);
+
+ if (filter.dataNode(node))
+ addToMap(affCacheNodes, cacheName, node);
+
+ if (filter.nearNode(node))
+ nearEnabledCaches.add(CU.cacheId(cacheName));
+ }
+ }
+ }
+
+ return new DiscoCache(
+ loc,
+ Collections.unmodifiableList(rmtNodes),
+ Collections.unmodifiableList(allNodes),
+ Collections.unmodifiableList(srvNodes),
+ Collections.unmodifiableList(daemonNodes),
+ U.sealList(srvNodesWithCaches),
+ U.sealList(allNodesWithCaches),
+ U.sealList(rmtNodesWithCaches),
+ Collections.unmodifiableMap(allCacheNodes),
+ Collections.unmodifiableMap(affCacheNodes),
+ Collections.unmodifiableMap(nodeMap),
+ Collections.unmodifiableSet(nearEnabledCaches),
+ alives);
+ }
+
+ /**
+ * Adds node to map.
+ *
+ * @param cacheMap Map to add to.
+ * @param cacheName Cache name.
+ * @param rich Node to add
+ */
+ private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+ List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
+
+ if (cacheNodes == null) {
+ cacheNodes = new ArrayList<>();
+
+ cacheMap.put(CU.cacheId(cacheName), cacheNodes);
+ }
+
+ cacheNodes.add(rich);
+ }
+
+ /**
* Updates topology version if current version is smaller than updated.
*
* @param updated Updated topology version.
@@ -1991,8 +2109,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
lastChk = now;
if (!segValid) {
- discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(),
- Collections.<ClusterNode>emptyList(), null);
+ List<ClusterNode> empty = Collections.emptyList();
+
+ ClusterNode node = getSpi().getLocalNode();
+
+ discoWrk.addEvent(EVT_NODE_SEGMENTED,
+ AffinityTopologyVersion.NONE,
+ node,
+ createDiscoCache(node, empty),
+ empty,
+ null);
lastSegChkRes.set(false);
}
@@ -2012,8 +2138,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Worker for discovery events. */
private class DiscoveryWorker extends GridWorker {
/** Event queue. */
- private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
- DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
+ private final BlockingQueue<GridTuple6<Integer, AffinityTopologyVersion, ClusterNode,
+ DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
/** Node segmented event fired flag. */
private boolean nodeSegFired;
@@ -2031,10 +2157,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param type Discovery event type. See {@link DiscoveryEvent} for more details.
* @param topVer Topology version.
* @param node Remote node this event is connected with.
+ * @param discoCache Discovery cache.
* @param topSnapshot Topology snapshot.
*/
@SuppressWarnings("RedundantTypeArguments")
- private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
+ private void recordEvent(int type, long topVer, ClusterNode node, DiscoCache discoCache, Collection<ClusterNode> topSnapshot) {
assert node != null;
if (ctx.event().isRecordable(type)) {
@@ -2043,7 +2170,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
evt.node(ctx.discovery().localNode());
evt.eventNode(node);
evt.type(type);
-
evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, FILTER_DAEMON));
if (type == EVT_NODE_METRICS_UPDATED)
@@ -2070,7 +2196,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else
assert false;
- ctx.event().record(evt);
+ ctx.event().record(evt, discoCache);
}
}
@@ -2078,6 +2204,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param type Event type.
* @param topVer Topology version.
* @param node Node.
+ * @param discoCache Discovery cache.
* @param topSnapshot Topology snapshot.
* @param data Custom message.
*/
@@ -2085,12 +2212,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
int type,
AffinityTopologyVersion topVer,
ClusterNode node,
+ DiscoCache discoCache,
Collection<ClusterNode> topSnapshot,
@Nullable DiscoveryCustomMessage data
) {
assert node != null : data;
- evts.add(new GridTuple5<>(type, topVer, node, topSnapshot, data));
+ evts.add(new GridTuple6<>(type, topVer, node, discoCache, topSnapshot, data));
}
/**
@@ -2127,7 +2255,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** @throws InterruptedException If interrupted. */
@SuppressWarnings("DuplicateCondition")
private void body0() throws InterruptedException {
- GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
+ GridTuple6<Integer, AffinityTopologyVersion, ClusterNode, DiscoCache, Collection<ClusterNode>,
DiscoveryCustomMessage> evt = evts.take();
int type = evt.get1();
@@ -2260,11 +2388,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
customEvt.node(ctx.discovery().localNode());
customEvt.eventNode(node);
customEvt.type(type);
- customEvt.topologySnapshot(topVer.topologyVersion(), evt.get4());
+ customEvt.topologySnapshot(topVer.topologyVersion(), evt.get5());
customEvt.affinityTopologyVersion(topVer);
- customEvt.customMessage(evt.get5());
+ customEvt.customMessage(evt.get6());
- ctx.event().record(customEvt);
+ ctx.event().record(customEvt, evt.get4());
}
return;
@@ -2278,7 +2406,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
assert false : "Invalid discovery event: " + type;
}
- recordEvent(type, topVer.topologyVersion(), node, evt.get4());
+ recordEvent(type, topVer.topologyVersion(), node, evt.get4(), evt.get5());
if (segmented)
onSegmentation();
@@ -2488,432 +2616,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
- /** Cache for discovery collections. */
- private class DiscoCache {
- /** Remote nodes. */
- private final List<ClusterNode> rmtNodes;
-
- /** All nodes. */
- private final List<ClusterNode> allNodes;
-
- /** All server nodes. */
- private final List<ClusterNode> srvNodes;
-
- /** All nodes with at least one cache configured. */
- @GridToStringInclude
- private final Collection<ClusterNode> allNodesWithCaches;
-
- /** All nodes with at least one cache configured. */
- @GridToStringInclude
- private final Collection<ClusterNode> rmtNodesWithCaches;
-
- /** Cache nodes by cache name. */
- @GridToStringInclude
- private final Map<String, Collection<ClusterNode>> allCacheNodes;
-
- /** Remote cache nodes by cache name. */
- @GridToStringInclude
- private final Map<String, Collection<ClusterNode>> rmtCacheNodes;
-
- /** Cache nodes by cache name. */
- @GridToStringInclude
- private final Map<String, Collection<ClusterNode>> affCacheNodes;
-
- /** Caches where at least one node has near cache enabled. */
- @GridToStringInclude
- private final Set<String> nearEnabledCaches;
-
- /** Nodes grouped by version. */
- private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer;
-
- /** Daemon nodes. */
- private final List<ClusterNode> daemonNodes;
-
- /** Node map. */
- private final Map<UUID, ClusterNode> nodeMap;
-
- /** Local node. */
- private final ClusterNode loc;
-
- /** Highest node order. */
- private final long maxOrder;
-
- /**
- * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
- * #maskNull(String)} before passing raw cache names to it.
- */
- private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes;
-
- /**
- * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link
- * #maskNull(String)} before passing raw cache names to it.
- */
- private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
-
- /**
- * Cached alive server remote nodes with caches.
- */
- private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches;
-
- /**
- * @param loc Local node.
- * @param rmts Remote nodes.
- */
- private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) {
- this.loc = loc;
-
- rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, FILTER_DAEMON)));
-
- assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
- " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
-
- List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1);
-
- if (!loc.isDaemon())
- all.add(loc);
-
- all.addAll(rmtNodes);
-
- Collections.sort(all, GridNodeOrderComparator.INSTANCE);
-
- allNodes = Collections.unmodifiableList(all);
-
- Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f);
- Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
- Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
-
- aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
- aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
- aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
- nodesByVer = new TreeMap<>();
-
- long maxOrder0 = 0;
-
- Set<String> nearEnabledSet = new HashSet<>();
-
- List<ClusterNode> srvNodes = new ArrayList<>();
-
- for (ClusterNode node : allNodes) {
- assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
- assert !node.isDaemon();
-
- if (!CU.clientNode(node))
- srvNodes.add(node);
-
- if (node.order() > maxOrder0)
- maxOrder0 = node.order();
-
- boolean hasCaches = false;
-
- for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
- String cacheName = entry.getKey();
-
- CachePredicate filter = entry.getValue();
-
- if (filter.cacheNode(node)) {
- nodesWithCaches.add(node);
-
- if (!loc.id().equals(node.id()))
- rmtNodesWithCaches.add(node);
-
- addToMap(cacheMap, cacheName, node);
-
- if (alive(node.id()))
- addToMap(aliveCacheNodes, maskNull(cacheName), node);
-
- if (filter.dataNode(node))
- addToMap(dhtNodesMap, cacheName, node);
-
- if (filter.nearNode(node))
- nearEnabledSet.add(cacheName);
-
- if (!loc.id().equals(node.id())) {
- addToMap(rmtCacheMap, cacheName, node);
-
- if (alive(node.id()))
- addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
- }
-
- hasCaches = true;
- }
- }
-
- if (hasCaches && alive(node.id()) && !CU.clientNode(node))
- aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
-
- IgniteProductVersion nodeVer = U.productVersion(node);
-
- // Create collection for this version if it does not exist.
- Collection<ClusterNode> nodes = nodesByVer.get(nodeVer);
-
- if (nodes == null) {
- nodes = new ArrayList<>(allNodes.size());
-
- nodesByVer.put(nodeVer, nodes);
- }
-
- nodes.add(node);
- }
-
- Collections.sort(srvNodes, CU.nodeComparator(true));
-
- // Need second iteration to add this node to all previous node versions.
- for (ClusterNode node : allNodes) {
- IgniteProductVersion nodeVer = U.productVersion(node);
-
- // Get all versions lower or equal node's version.
- NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView =
- nodesByVer.headMap(nodeVer, false);
-
- for (Collection<ClusterNode> prevVersions : updateView.values())
- prevVersions.add(node);
- }
-
- maxOrder = maxOrder0;
-
- allCacheNodes = Collections.unmodifiableMap(cacheMap);
- rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
- affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
- allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches);
- this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches);
- nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet);
- this.srvNodes = Collections.unmodifiableList(srvNodes);
-
- daemonNodes = Collections.unmodifiableList(new ArrayList<>(
- F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON))));
-
- Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f);
-
- for (ClusterNode n : F.concat(false, allNodes(), daemonNodes()))
- nodeMap.put(n.id(), n);
-
- this.nodeMap = nodeMap;
- }
-
- /**
- * Adds node to map.
- *
- * @param cacheMap Map to add to.
- * @param cacheName Cache name.
- * @param rich Node to add
- */
- private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
- Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName);
-
- if (cacheNodes == null) {
- cacheNodes = new ArrayList<>(allNodes.size());
-
- cacheMap.put(cacheName, cacheNodes);
- }
-
- cacheNodes.add(rich);
- }
-
- /** @return Local node. */
- ClusterNode localNode() {
- return loc;
- }
-
- /** @return Remote nodes. */
- Collection<ClusterNode> remoteNodes() {
- return rmtNodes;
- }
-
- /** @return All nodes. */
- Collection<ClusterNode> allNodes() {
- return allNodes;
- }
-
- /**
- * Gets collection of nodes which have version equal or greater than {@code ver}.
- *
- * @param ver Version to check.
- * @return Collection of nodes with version equal or greater than {@code ver}.
- */
- Collection<ClusterNode> elderNodes(IgniteProductVersion ver) {
- Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver);
-
- if (entry == null)
- return Collections.emptyList();
-
- return entry.getValue();
- }
-
- /**
- * @return Versions map.
- */
- NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() {
- return nodesByVer;
- }
-
- /**
- * Gets collection of nodes with at least one cache configured.
- *
- * @param topVer Topology version (maximum allowed node order).
- * @return Collection of nodes.
- */
- Collection<ClusterNode> allNodesWithCaches(final long topVer) {
- return filter(topVer, allNodesWithCaches);
- }
-
- /**
- * Gets all nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, allCacheNodes.get(cacheName));
- }
-
- /**
- * Gets all remote nodes that have at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> remoteCacheNodes(final long topVer) {
- return filter(topVer, rmtNodesWithCaches);
- }
-
- /**
- * Gets all nodes that have cache with given name and should participate in affinity calculation. With
- * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, affCacheNodes.get(cacheName));
- }
-
- /**
- * Gets all alive nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, aliveCacheNodes.get(maskNull(cacheName)));
- }
-
- /**
- * Gets all alive remote nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName)));
- }
-
- /**
- * Checks if cache with given name has at least one node with near cache enabled.
- *
- * @param cacheName Cache name.
- * @return {@code True} if cache with given name has at least one node with near cache enabled.
- */
- boolean hasNearCache(@Nullable String cacheName) {
- return nearEnabledCaches.contains(cacheName);
- }
-
- /**
- * Removes left node from cached alives lists.
- *
- * @param leftNode Left node.
- */
- void updateAlives(ClusterNode leftNode) {
- if (leftNode.order() > maxOrder)
- return;
-
- filterNodeMap(aliveCacheNodes, leftNode);
-
- filterNodeMap(aliveRmtCacheNodes, leftNode);
-
- aliveSrvNodesWithCaches.remove(leftNode);
- }
-
- /**
- * Creates a copy of nodes map without the given node.
- *
- * @param map Map to copy.
- * @param exclNode Node to exclude.
- */
- private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) {
- for (String cacheName : registeredCaches.keySet()) {
- String maskedName = maskNull(cacheName);
-
- while (true) {
- Collection<ClusterNode> oldNodes = map.get(maskedName);
-
- if (oldNodes == null || oldNodes.isEmpty())
- break;
-
- Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes);
-
- if (!newNodes.remove(exclNode))
- break;
-
- if (map.replace(maskedName, oldNodes, newNodes))
- break;
- }
- }
- }
-
- /**
- * Replaces {@code null} with {@code NULL_CACHE_NAME}.
- *
- * @param cacheName Cache name.
- * @return Masked name.
- */
- private String maskNull(@Nullable String cacheName) {
- return cacheName == null ? NULL_CACHE_NAME : cacheName;
- }
-
- /**
- * @param topVer Topology version.
- * @param nodes Nodes.
- * @return Filtered collection (potentially empty, but never {@code null}).
- */
- private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) {
- if (nodes == null)
- return Collections.emptyList();
-
- // If no filtering needed, return original collection.
- return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ?
- nodes :
- F.view(nodes, new P1<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return node.order() <= topVer;
- }
- });
- }
-
- /** @return Daemon nodes. */
- Collection<ClusterNode> daemonNodes() {
- return daemonNodes;
- }
-
- /**
- * @param id Node ID.
- * @return Node.
- */
- @Nullable ClusterNode node(UUID id) {
- return nodeMap.get(id);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes));
- }
- }
-
/**
* Cache predicate.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java
new file mode 100644
index 0000000..963d97e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.eventstorage;
+
+import java.util.EventListener;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+
+/**
+ * Internal listener for discovery events.
+ */
+public interface DiscoveryEventListener extends EventListener {
+ /**
+ * @param evt Discovery event.
+ * @param discoCache Discovery cache.
+ */
+ public void onEvent(DiscoveryEvent evt, DiscoCache discoCache);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 607bb96..a2c64ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -79,6 +80,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
/** Local event listeners. */
private final ConcurrentMap<Integer, Set<GridLocalEventListener>> lsnrs = new ConcurrentHashMap8<>();
+ /** Internal discovery listeners. */
+ private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>();
+
/** Busy lock to control activity of threads. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -234,6 +238,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
msgLsnr = null;
lsnrs.clear();
+ discoLsnrs.clear();
}
/** {@inheritDoc} */
@@ -300,6 +305,30 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
+ * Records discovery events.
+ *
+ * @param evt Event to record.
+ * @param discoCache Discovery cache.
+ */
+ public void record(DiscoveryEvent evt, DiscoCache discoCache) {
+ assert evt != null;
+
+ if (!enterBusy())
+ return;
+
+ try {
+ // Notify internal discovery listeners first.
+ notifyDiscoveryListeners(evt, discoCache);
+
+ // Notify all other registered listeners.
+ record(evt);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
* Gets types of enabled user-recordable events.
*
* @return Array of types of enabled user-recordable events.
@@ -570,7 +599,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
try {
for (int t : types) {
- getOrCreate(t).add(lsnr);
+ getOrCreate(lsnrs, t).add(lsnr);
if (!isRecordable(t))
U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
@@ -595,14 +624,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
return;
try {
- getOrCreate(type).add(lsnr);
+ getOrCreate(lsnrs, type).add(lsnr);
if (!isRecordable(type))
U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
if (types != null) {
for (int t : types) {
- getOrCreate(t).add(lsnr);
+ getOrCreate(lsnrs, t).add(lsnr);
if (!isRecordable(t))
U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
@@ -615,16 +644,70 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
+ * Adds discovery event listener. Note that this method specifically disallow an empty
+ * array of event type to prevent accidental subscription for all system event that
+ * may lead to a drastic performance decrease.
+ *
+ * @param lsnr Listener to add.
+ * @param types Event types to subscribe listener for.
+ */
+ public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int[] types) {
+ assert lsnr != null;
+ assert types != null;
+ assert types.length > 0;
+
+ if (!enterBusy())
+ return;
+
+ try {
+ for (int t : types) {
+ getOrCreate(discoLsnrs, t).add(lsnr);
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * Adds discovery event listener.
+ *
+ * @param lsnr Listener to add.
+ * @param type Event type to subscribe listener for.
+ * @param types Additional event types to subscribe listener for.
+ */
+ public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) {
+ assert lsnr != null;
+
+ if (!enterBusy())
+ return;
+
+ try {
+ getOrCreate(discoLsnrs, type).add(lsnr);
+
+ if (types != null) {
+ for (int t : types) {
+ getOrCreate(discoLsnrs, t).add(lsnr);
+ }
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * @param lsnrs Listeners map.
* @param type Event type.
* @return Listeners for given event type.
*/
- private Collection<GridLocalEventListener> getOrCreate(Integer type) {
- Set<GridLocalEventListener> set = lsnrs.get(type);
+ private <T> Collection<T> getOrCreate(ConcurrentMap<Integer, Set<T>> lsnrs, Integer type) {
+ Set<T> set = lsnrs.get(type);
if (set == null) {
set = new GridConcurrentLinkedHashSet<>();
- Set<GridLocalEventListener> prev = lsnrs.putIfAbsent(type, set);
+ Set<T> prev = lsnrs.putIfAbsent(type, set);
if (prev != null)
set = prev;
@@ -688,6 +771,38 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
+ * Removes listener for specified events, if any. If no event types provided - it
+ * remove the listener for all its registered events.
+ *
+ * @param lsnr Listener.
+ * @param types Event types.
+ * @return Returns {@code true} if removed.
+ */
+ public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) {
+ assert lsnr != null;
+
+ boolean found = false;
+
+ if (F.isEmpty(types)) {
+ for (Set<DiscoveryEventListener> set : discoLsnrs.values())
+ if (set.remove(lsnr))
+ found = true;
+ }
+ else {
+ assert types != null;
+
+ for (int type : types) {
+ Set<DiscoveryEventListener> set = discoLsnrs.get(type);
+
+ if (set != null && set.remove(lsnr))
+ found = true;
+ }
+ }
+
+ return found;
+ }
+
+ /**
*
* @param p Optional predicate.
* @param types Event types to wait for.
@@ -780,6 +895,41 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
+ * @param evt Discovery event
+ * @param cache Discovery cache.
+ */
+ private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) {
+ assert evt != null;
+
+ notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache);
+ }
+
+ /**
+ * @param set Set of listeners.
+ * @param evt Discovery event.
+ * @param cache Discovery cache.
+ */
+ private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) {
+ assert evt != null;
+
+ if (!F.isEmpty(set)) {
+ assert set != null;
+
+ for (DiscoveryEventListener lsnr : set) {
+ try {
+ lsnr.onEvent(evt, cache);
+ }
+ catch (Throwable e) {
+ U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+ }
+ }
+
+ /**
* @param p Grid event predicate.
* @return Collection of grid events.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a388c7a..5070462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -252,10 +253,12 @@ public class GridAffinityAssignmentCache {
*
* @param topVer Topology version to calculate affinity cache for.
* @param discoEvt Discovery event that caused this topology version change.
+ * @param discoCache Discovery cache.
* @return Affinity assignments.
*/
@SuppressWarnings("IfMayBeConditional")
- public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
+ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt,
+ DiscoCache discoCache) {
if (log.isDebugEnabled())
log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
", discoEvt=" + discoEvt + ']');
@@ -266,7 +269,7 @@ public class GridAffinityAssignmentCache {
List<ClusterNode> sorted;
if (!locCache) {
- sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer));
+ sorted = new ArrayList<>(discoCache.cacheAffinityNodes(cacheId()));
Collections.sort(sorted, GridNodeOrderComparator.INSTANCE);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 2890887..2642d16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -382,7 +382,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.cache().prepareCacheStart(req, fut.topologyVersion());
if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
- if (cctx.discovery().cacheAffinityNodes(req.cacheName(), fut.topologyVersion()).isEmpty())
+ if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
}
@@ -403,7 +403,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
- fut.discoveryEvent());
+ fut.discoveryEvent(), fut.discoCache());
aff.initialize(fut.topologyVersion(), assignment);
}
@@ -753,7 +753,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert old == null : old;
- List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
cache.affinity().initialize(fut.topologyVersion(), newAff);
}
@@ -791,7 +791,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
List<List<ClusterNode>> assignment =
- cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
cache.affinity().initialize(fut.topologyVersion(), assignment);
}
@@ -817,14 +817,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch)
throws IgniteCheckedException {
if (!fetch && canCalculateAffinity(aff, fut)) {
- List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+ List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
aff.initialize(fut.topologyVersion(), assignment);
}
else {
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
aff.cacheName(),
- fut.topologyVersion());
+ fut.topologyVersion(),
+ fut.discoCache());
fetchFut.init();
@@ -878,7 +879,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
CacheHolder cache = cache(fut, cacheDesc);
- List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent());
+ List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache());
cache.affinity().initialize(topVer, newAff);
}
@@ -945,14 +946,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) {
List<List<ClusterNode>> assignment =
- cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment);
}
else {
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
cacheCtx.name(),
- topVer);
+ topVer,
+ fut.discoCache());
fetchFut.init();
@@ -986,7 +988,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
GridDhtAffinityAssignmentResponse res = fetchFut.get();
if (res == null) {
- List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent());
+ List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
affCache.initialize(topVer, aff);
}
@@ -998,7 +1000,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
else {
assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
- affCache.calculate(topVer, fut.discoveryEvent());
+ affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
}
List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
@@ -1028,7 +1030,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (cacheCtx.isLocal())
continue;
- cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
}
centralizedAff = true;
@@ -1078,7 +1080,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (cache != null) {
if (cache.client())
- cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
return;
}
@@ -1118,7 +1120,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
aff.cacheName(),
- prev.topologyVersion());
+ prev.topologyVersion(),
+ prev.discoCache());
fetchFut.init();
@@ -1129,7 +1132,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
throws IgniteCheckedException {
fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
- aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+ aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
affFut.onDone(fut.topologyVersion());
}
@@ -1269,7 +1272,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert aff.idealAssignment() != null : "Previous assignment is not available.";
- List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent());
+ List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
List<List<ClusterNode>> newAssignment = null;
if (latePrimary) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 8b7be1b..d9ff616 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -77,7 +77,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
@Override protected void onKernalStart0() throws IgniteCheckedException {
if (cctx.isLocal())
// No discovery event needed for local affinity.
- aff.calculate(LOC_CACHE_TOP_VER, null);
+ aff.calculate(LOC_CACHE_TOP_VER, null, null);
}
/** {@inheritDoc} */
[3/3] ignite git commit: Merge remote-tracking branch
'remotes/upstream/ignite-1.8.4-p1'
Posted by av...@apache.org.
Merge remote-tracking branch 'remotes/upstream/ignite-1.8.4-p1'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/889594f4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/889594f4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/889594f4
Branch: refs/heads/master
Commit: 889594f454da8d9b2e7085825557143aabfb13f0
Parents: d34a4d0 8273e67
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Apr 3 18:27:18 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Apr 3 18:27:18 2017 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------