You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/06 10:45:43 UTC
[06/50] [abbrv] ignite git commit: IGNITE-4779 Missed discovery data
snapshot during exchange processing (Backport from 2.0)
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7cf75fe..81f21a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -48,14 +48,14 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
@@ -173,35 +173,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
/** Discovery listener. */
- private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
+ private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
+ @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
if (!enterBusy())
return;
try {
- DiscoveryEvent e = (DiscoveryEvent)evt;
-
ClusterNode loc = cctx.localNode();
- assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED ||
- e.type() == EVT_DISCOVERY_CUSTOM_EVT;
+ assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
+ evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
- final ClusterNode n = e.eventNode();
+ final ClusterNode n = evt.eventNode();
GridDhtPartitionExchangeId exchId = null;
GridDhtPartitionsExchangeFuture exchFut = null;
- if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) {
+ if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
assert !loc.id().equals(n.id());
- if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
+ if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
assert cctx.discovery().node(n.id()) == null;
// Avoid race b/w initial future add and discovery event.
GridDhtPartitionsExchangeFuture initFut = null;
if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) {
- initFut = exchangeFuture(initialExchangeId(), null, null, null);
+ initFut = exchangeFuture(initialExchangeId(), null, null, null, null);
initFut.onNodeLeft(n);
}
@@ -213,18 +211,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
assert
- e.type() != EVT_NODE_JOINED || n.order() > loc.order() :
+ evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
"Node joined with smaller-than-local " +
"order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
exchId = exchangeId(n.id(),
- affinityTopologyVersion(e),
- e.type());
+ affinityTopologyVersion(evt),
+ evt.type());
- exchFut = exchangeFuture(exchId, e, null, null);
+ exchFut = exchangeFuture(exchId, evt, cache,null, null);
}
else {
- DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
+ DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
@@ -254,9 +252,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (!F.isEmpty(valid)) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, e, valid, null);
+ exchFut = exchangeFuture(exchId, evt, cache, valid, null);
}
}
else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
@@ -264,13 +262,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (msg.exchangeId() == null) {
if (msg.exchangeNeeded()) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, e, null, msg);
+ exchFut = exchangeFuture(exchId, evt, cache, null, msg);
}
}
else
- exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
}
}
@@ -279,7 +277,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
log.debug("Discovery event (will start exchange): " + exchId);
// Event callback - without this callback future will never complete.
- exchFut.onEvent(exchId, e);
+ exchFut.onEvent(exchId, evt, cache);
// Start exchange process.
addFuture(exchFut);
@@ -301,7 +299,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchWorker = new ExchangeWorker();
- cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
+ cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
EVT_DISCOVERY_CUSTOM_EVT);
cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class,
@@ -359,11 +357,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
assert startTime > 0;
// Generate dummy discovery event for local node joining.
- DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
+ T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin();
+
+ DiscoveryEvent discoEvt = localJoin.get1();
+ DiscoCache discoCache = localJoin.get2();
GridDhtPartitionExchangeId exchId = initialExchangeId();
- GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null, null);
+ GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null);
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
@@ -470,7 +471,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
- cctx.gridEvents().removeLocalEventListener(discoLsnr);
+ cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
@@ -1063,12 +1064,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
+ * @param cache Discovery data cache.
* @param reqs Cache change requests.
* @param affChangeMsg Affinity change message.
* @return Exchange future.
*/
private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@Nullable DiscoveryEvent discoEvt,
+ @Nullable DiscoCache cache,
@Nullable Collection<DynamicCacheChangeRequest> reqs,
@Nullable CacheAffinityChangeMessage affChangeMsg) {
GridDhtPartitionsExchangeFuture fut;
@@ -1087,7 +1090,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (discoEvt != null)
- fut.onEvent(exchId, discoEvt);
+ fut.onEvent(exchId, discoEvt, cache);
if (stopErr != null)
fut.onDone(stopErr);
@@ -1231,7 +1234,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
refreshPartitions();
}
else
- exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
}
finally {
leaveBusy();
@@ -1287,6 +1290,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
null,
null,
+ null,
null);
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@@ -1297,7 +1301,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
});
}
else
- exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 816132d..9366d0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -42,7 +43,6 @@ import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -103,6 +103,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** */
private final Object similarAffKey;
+ /** */
+ private volatile DiscoCache discoCache;
+
/**
* @param cctx Context.
* @param cacheId Cache ID.
@@ -121,6 +124,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
topVer = exchFut.topologyVersion();
+ discoCache = exchFut.discoCache();
+
log = cctx.logger(getClass());
lock.writeLock().lock();
@@ -191,6 +196,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
this.stopping = stopping;
topVer = exchId.topologyVersion();
+ discoCache = exchFut.discoCache();
updateSeq.setIfGreater(updSeq);
@@ -271,7 +277,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
removeNode(exchId.nodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
assert oldest != null;
@@ -424,7 +430,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (!F.isEmpty(nodeIds)) {
for (UUID nodeId : nodeIds) {
- ClusterNode n = cctx.discovery().node(nodeId);
+ ClusterNode n = discoCache.node(nodeId);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
if (nodes == null)
@@ -450,7 +456,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
* @return List of nodes for the partition.
*/
private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null;
+ Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.allNodesWithCaches()) : null;
lock.readLock().lock();
@@ -473,7 +479,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
continue;
if (hasState(p, id, state, states)) {
- ClusterNode n = cctx.discovery().node(id);
+ ClusterNode n = discoCache.node(id);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
nodes.add(n);
@@ -766,7 +772,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId.equals(cctx.localNodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
// If this node became the oldest node.
if (oldest.id().equals(cctx.localNodeId())) {
@@ -816,7 +822,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId != null;
assert lock.writeLock().isHeldByCurrentThread();
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
ClusterNode loc = cctx.localNode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..2c3d7ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -72,16 +73,18 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
* @param ctx Context.
* @param cacheName Cache name.
* @param topVer Topology version.
+ * @param discoCache Discovery cache.
*/
public GridDhtAssignmentFetchFuture(
GridCacheSharedContext ctx,
String cacheName,
- AffinityTopologyVersion topVer
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache
) {
this.ctx = ctx;
this.key = new T2<>(CU.cacheId(cacheName), topVer);
- Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer);
+ Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(CU.cacheId(cacheName));
LinkedList<ClusterNode> tmp = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1b4dcc9..5c3fba0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -95,6 +96,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+ /** Discovery cache. */
+ private volatile DiscoCache discoCache;
+
/** */
private volatile boolean stopping;
@@ -151,6 +155,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
rebalancedTopVer = AffinityTopologyVersion.NONE;
topVer = AffinityTopologyVersion.NONE;
+
+ discoCache = cctx.discovery().discoCache();
}
finally {
lock.writeLock().unlock();
@@ -293,6 +299,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
rebalancedTopVer = AffinityTopologyVersion.NONE;
topVer = exchId.topologyVersion();
+
+ discoCache = exchFut.discoCache();
}
finally {
lock.writeLock().unlock();
@@ -356,7 +364,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
ClusterNode loc = cctx.localNode();
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -481,7 +489,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (exchId.isLeft())
removeNode(exchId.nodeId());
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (log.isDebugEnabled())
log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -882,7 +890,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
AffinityTopologyVersion topVer,
GridDhtPartitionState state,
GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
+ Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null;
lock.readLock().lock();
@@ -979,7 +987,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -1112,7 +1120,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
@Nullable Map<Integer, Long> cntrMap,
boolean checkEvictions) {
@@ -1284,7 +1292,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
List<ClusterNode> affNodes = aff.get(p);
if (!affNodes.contains(cctx.localNode())) {
- Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING));
+ List<ClusterNode> nodes = nodes(p, topVer, OWNING);
+ Collection<UUID> nodeIds = F.nodeIds(nodes);
// If all affinity nodes are owners, then evict partition from local node.
if (nodeIds.containsAll(F.nodeIds(affNodes))) {
@@ -1302,15 +1311,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int affCnt = affNodes.size();
if (ownerCnt > affCnt) {
- List<ClusterNode> sorted = new ArrayList<>(cctx.discovery().nodes(nodeIds));
-
// Sort by node orders in ascending order.
- Collections.sort(sorted, CU.nodeComparator(true));
+ Collections.sort(nodes, CU.nodeComparator(true));
- int diff = sorted.size() - affCnt;
+ int diff = nodes.size() - affCnt;
for (int i = 0; i < diff; i++) {
- ClusterNode n = sorted.get(i);
+ ClusterNode n = nodes.get(i);
if (locId.equals(n.id())) {
part.rent(false);
@@ -1336,17 +1343,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
- * @return Current coordinator node.
- */
- @Nullable private ClusterNode currentCoordinator() {
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
-
- assert oldest != null || cctx.kernalContext().clientNode();
-
- return oldest;
- }
-
- /**
* Updates value for single partition.
*
* @param p Partition.
@@ -1356,7 +1352,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
assert oldest != null || cctx.kernalContext().clientNode();
@@ -1421,7 +1417,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private void removeNode(UUID nodeId) {
assert nodeId != null;
- ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer));
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
assert oldest != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 44780f1..c91f881 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2441,7 +2441,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = req.topologyVersion();
- boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
boolean readersOnly = false;
@@ -2676,7 +2676,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = req.topologyVersion();
- boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
CacheStorePartialUpdateException storeErr = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e945de9..7b5d09b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,7 +44,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -103,6 +103,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Dummy reassign flag. */
private final boolean reassign;
+ /** */
+ @GridToStringExclude
+ private volatile DiscoCache discoCache;
+
/** Discovery event. */
private volatile DiscoveryEvent discoEvt;
@@ -147,9 +151,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** */
private boolean init;
- /** Topology snapshot. */
- private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
-
/** Last committed cache version before next topology version use. */
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
@@ -332,6 +333,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @return Discovery cache.
+ */
+ public DiscoCache discoCache() {
+ return discoCache;
+ }
+
+ /**
* @param cacheId Cache ID to check.
* @param topVer Topology version.
* @return {@code True} if cache was added during this exchange.
@@ -374,11 +382,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
+ * @param discoCache Discovery data cache.
*/
- public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt) {
+ public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) {
assert exchId.equals(this.exchId);
this.discoEvt = discoEvt;
+ this.discoCache = discoCache;
evtLatch.countDown();
}
@@ -435,7 +445,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert !dummy && !forcePreload : this;
try {
- srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
+ discoCache.updateAlives(cctx.discovery());
+
+ srvNodes = new ArrayList<>(discoCache.serverNodes());
remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@ -550,7 +562,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
if (updateTop && clientTop != null)
- cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
+ top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
}
top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
@@ -842,7 +854,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
List<String> cachesWithoutNodes = null;
for (String name : cctx.cache().cacheNames()) {
- if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
+ if (discoCache.cacheAffinityNodes(name).isEmpty()) {
if (cachesWithoutNodes == null)
cachesWithoutNodes = new ArrayList<>();
@@ -1096,7 +1108,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* Cleans up resources to avoid excessive memory usage.
*/
public void cleanUp() {
- topSnapshot.set(null);
singleMsgs.clear();
fullMsgs.clear();
crd = null;
@@ -1252,7 +1263,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
try {
assert crd.isLocal();
- if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
+ if (!crd.equals(discoCache.serverNodes().get(0))) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal())
cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff);
@@ -1559,6 +1570,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
ClusterNode crd0;
+ discoCache.updateAlives(node);
+
synchronized (mux) {
if (!srvNodes.remove(node))
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d26242d..99146aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -48,7 +48,6 @@ import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridClosureCallMode;
import org.apache.ignite.internal.GridKernalContext;
@@ -58,8 +57,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -167,7 +167,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
private IgniteInternalCache<Object, Object> cache;
/** Topology listener. */
- private GridLocalEventListener topLsnr = new TopologyListener();
+ private DiscoveryEventListener topLsnr = new TopologyListener();
static {
Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() {
@@ -251,7 +251,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
cache = ctx.cache().utilityCache();
if (!ctx.clientNode())
- ctx.event().addLocalEventListener(topLsnr, EVTS);
+ ctx.event().addDiscoveryEventListener(topLsnr, EVTS);
try {
if (ctx.deploy().enabled())
@@ -314,7 +314,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
busyLock.block();
if (!ctx.clientNode())
- ctx.event().removeLocalEventListener(topLsnr);
+ ctx.event().removeDiscoveryEventListener(topLsnr);
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
@@ -1568,9 +1568,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/**
* Topology listener.
*/
- private class TopologyListener implements GridLocalEventListener {
+ private class TopologyListener implements DiscoveryEventListener {
/** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
+ @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) {
if (!busyLock.enterBusy())
return;
@@ -1588,11 +1588,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
else
- topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+ topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
depExe.execute(new BusyRunnable() {
@Override public void run0() {
- ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+ // In case the cache instance isn't tracked by DiscoveryManager anymore.
+ discoCache.updateAlives(ctx.discovery());
+
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
index 31b4bc7..f0c50eb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
@@ -23,16 +23,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgnitePredicate;
@@ -90,8 +85,8 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
}
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration cCfg = defaultCacheConfiguration();
@@ -103,7 +98,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
TcpDiscoverySpi disc = new TcpDiscoverySpi();
- if (clientMode && ((gridName.charAt(gridName.length() - 1) - '0') & 1) != 0)
+ if (clientMode && ((igniteInstanceName.charAt(igniteInstanceName.length() - 1) - '0') & 1) != 0)
cfg.setClientMode(true);
else
disc.setMaxMissedClientHeartbeats(50);
@@ -158,8 +153,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
stopTempNodes();
latch.await();
-
- validateAlives();
}
}
@@ -200,55 +193,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
}
/**
- * Validates that all node collections contain actual information.
- */
- @SuppressWarnings("SuspiciousMethodCalls")
- private void validateAlives() {
- for (Ignite g : alive) {
- log.info("Validate node: " + g.name());
-
- assertEquals("Unexpected nodes number for node: " + g.name(), PERM_NODES_CNT, g.cluster().nodes().size());
- }
-
- for (final Ignite g : alive) {
- IgniteKernal k = (IgniteKernal)g;
-
- GridDiscoveryManager discoMgr = k.context().discovery();
-
- final Collection<ClusterNode> currTop = g.cluster().nodes();
-
- long currVer = discoMgr.topologyVersion();
-
- long startVer = discoMgr.localNode().order();
-
- for (long v = currVer; v > currVer - GridDiscoveryManager.DISCOVERY_HISTORY_SIZE && v >= startVer; v--) {
- F.forAll(discoMgr.aliveCacheNodes(null, new AffinityTopologyVersion(v)),
- new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode e) {
- return currTop.contains(e);
- }
- });
-
- F.forAll(discoMgr.aliveRemoteCacheNodes(null, new AffinityTopologyVersion(v)),
- new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode e) {
- return currTop.contains(e) || g.cluster().localNode().equals(e);
- }
- });
-
- GridCacheSharedContext<?, ?> ctx = k.context().cache().context();
-
- ClusterNode oldest =
- ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer));
-
- assertNotNull(oldest);
-
- assertTrue(currTop.contains(oldest));
- }
- }
- }
-
- /**
* Starts temporary nodes.
*
* @throws Exception If failed.
@@ -293,4 +237,4 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
G.stop(g.name(), false);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
index ba8fa5b..5de2910 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
@@ -54,10 +54,10 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
private static boolean binaryMarshallerEnabled;
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- if (gridName.equals(getTestGridName(1)))
+ if (igniteInstanceName.equals(getTestGridName(1)))
cfg.setClientMode(true);
if (binaryMarshallerEnabled)
@@ -160,7 +160,7 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
if (fail)
fail("Node should not join");
}
- catch (Exception e) {
+ catch (Exception ignored) {
if (!fail)
fail("Node should join");
}
@@ -215,7 +215,7 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
if (fail)
fail("Node should not join");
}
- catch (Exception e) {
+ catch (Exception ignored) {
if (!fail)
fail("Node should join");
}
@@ -346,8 +346,8 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
*/
public static class RegularDiscovery extends GridDiscoveryManagerAttributesSelfTest {
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
deleted file mode 100644
index c9179d4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.discovery;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-
-/**
- *
- */
-public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTest {
- /** */
- private static final String CACHE_NAME = "cache";
-
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("IfMayBeConditional")
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- CacheConfiguration ccfg1 = defaultCacheConfiguration();
-
- ccfg1.setName(CACHE_NAME);
-
- CacheConfiguration ccfg2 = defaultCacheConfiguration();
-
- ccfg2.setName(null);
-
- if (gridName.equals(getTestGridName(1)))
- cfg.setClientMode(true);
- else {
- ccfg1.setNearConfiguration(null);
- ccfg2.setNearConfiguration(null);
-
- ccfg1.setCacheMode(PARTITIONED);
- ccfg2.setCacheMode(PARTITIONED);
-
- cfg.setCacheConfiguration(ccfg1, ccfg2);
- }
-
- TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
-
- discoverySpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoverySpi);
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testHasNearCache() throws Exception {
- IgniteKernal g0 = (IgniteKernal)startGrid(0); // PARTITIONED_ONLY cache.
-
- AffinityTopologyVersion none = new AffinityTopologyVersion(-1);
- AffinityTopologyVersion one = new AffinityTopologyVersion(1);
- AffinityTopologyVersion two = new AffinityTopologyVersion(2, 2);
- AffinityTopologyVersion three = new AffinityTopologyVersion(3);
- AffinityTopologyVersion four = new AffinityTopologyVersion(4);
- AffinityTopologyVersion five = new AffinityTopologyVersion(5);
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, none));
- assertFalse(g0.context().discovery().hasNearCache(null, none));
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
-
- IgniteKernal g1 = (IgniteKernal)startGrid(1); // NEAR_ONLY cache.
-
- grid(1).createNearCache(null, new NearCacheConfiguration());
-
- grid(1).createNearCache(CACHE_NAME, new NearCacheConfiguration());
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g1.context().discovery().hasNearCache(null, two));
-
- IgniteKernal g2 = (IgniteKernal)startGrid(2); // PARTITIONED_ONLY cache.
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g1.context().discovery().hasNearCache(null, two));
- assertTrue(g1.context().discovery().hasNearCache(null, three));
-
- assertTrue(g2.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g2.context().discovery().hasNearCache(null, three));
-
- stopGrid(2);
-
- // Wait all nodes are on version 4.
- for (;;) {
- if (F.forAll(
- Ignition.allGrids(),
- new IgnitePredicate<Ignite>() {
- @Override public boolean apply(Ignite ignite) {
- return ignite.cluster().topologyVersion() == 4;
- }
- }))
- break;
-
- Thread.sleep(1000);
- }
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
- assertTrue(g0.context().discovery().hasNearCache(null, four));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, four));
- assertTrue(g1.context().discovery().hasNearCache(null, three));
- assertTrue(g1.context().discovery().hasNearCache(null, four));
-
- stopGrid(1);
-
- // Wait all nodes are on version 5.
- for (;;) {
- if (F.forAll(
- Ignition.allGrids(),
- new IgnitePredicate<Ignite>() {
- @Override public boolean apply(Ignite ignite) {
- return ignite.cluster().topologyVersion() == 5;
- }
- }))
- break;
-
- Thread.sleep(1000);
- }
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four));
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, five));
-
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
- assertTrue(g0.context().discovery().hasNearCache(null, four));
- assertFalse(g0.context().discovery().hasNearCache(null, five));
- }
-
- /**
- *
- */
- public static class RegularDiscovery extends GridDiscoveryManagerSelfTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
-
- return cfg;
- }
- }
-
- /**
- *
- */
- public static class ClientDiscovery extends GridDiscoveryManagerSelfTest {
- // No-op.
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
index 58992af..86ad458 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
@@ -50,16 +50,16 @@ public class IgniteTopologyPrintFormatSelfTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpDiscoverySpi disc = new TcpDiscoverySpi();
disc.setIpFinder(IP_FINDER);
- if (gridName.endsWith("client"))
+ if (igniteInstanceName.endsWith("client"))
cfg.setClientMode(true);
- if (gridName.endsWith("client_force_server")) {
+ if (igniteInstanceName.endsWith("client_force_server")) {
cfg.setClientMode(true);
disc.setForceServerMode(true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index b28619c..985dddb 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.managers.communication.GridCommunicationSendMe
import org.apache.ignite.internal.managers.deployment.GridDeploymentManagerStopSelfTest;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAliveCacheSelfTest;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAttributesSelfTest;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerSelfTest;
import org.apache.ignite.internal.managers.discovery.IgniteTopologyPrintFormatSelfTest;
import org.apache.ignite.internal.managers.events.GridEventStorageManagerSelfTest;
import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManagerSelfTest;
@@ -111,8 +110,6 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
suite.addTestSuite(GridDiscoveryManagerAttributesSelfTest.RegularDiscovery.class);
suite.addTestSuite(GridDiscoveryManagerAttributesSelfTest.ClientDiscovery.class);
suite.addTestSuite(GridDiscoveryManagerAliveCacheSelfTest.class);
- suite.addTestSuite(GridDiscoveryManagerSelfTest.RegularDiscovery.class);
- suite.addTestSuite(GridDiscoveryManagerSelfTest.ClientDiscovery.class);
suite.addTestSuite(GridDiscoveryEventSelfTest.class);
suite.addTestSuite(GridPortProcessorSelfTest.class);
suite.addTestSuite(GridHomePathSelfTest.class);