You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/13 10:28:05 UTC
[31/50] [abbrv] ignite git commit: ignite-4779 Missed discovery data
snapshot during exchange processing (do not use discovery manager cache to
handle exchange)
http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index b5cb5cf..56acc26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -71,17 +72,19 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
* @param ctx Context.
* @param cacheName Cache name.
* @param topVer Topology version.
+ * @param discoCache Discovery cache.
*/
public GridDhtAssignmentFetchFuture(
GridCacheSharedContext ctx,
String cacheName,
- AffinityTopologyVersion topVer
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache
) {
this.ctx = ctx;
int cacheId = CU.cacheId(cacheName);
this.key = new T2<>(cacheId, topVer);
- Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheId, topVer);
+ Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheId);
LinkedList<ClusterNode> tmp = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 966a186..84ff96b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -95,6 +96,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+ /** Discovery cache. */
+ private volatile DiscoCache discoCache;
+
/** */
private volatile boolean stopping;
@@ -151,6 +155,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
rebalancedTopVer = AffinityTopologyVersion.NONE;
topVer = AffinityTopologyVersion.NONE;
+
+ discoCache = cctx.discovery().discoCache();
}
finally {
lock.writeLock().unlock();
@@ -293,6 +299,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
rebalancedTopVer = AffinityTopologyVersion.NONE;
topVer = exchId.topologyVersion();
+
+ discoCache = exchFut.discoCache();
}
finally {
lock.writeLock().unlock();
@@ -349,7 +357,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
ClusterNode loc = cctx.localNode();
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -474,7 +482,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (exchId.isLeft())
removeNode(exchId.nodeId());
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (log.isDebugEnabled())
log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -876,7 +884,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
AffinityTopologyVersion topVer,
GridDhtPartitionState state,
GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
+ Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null;
lock.readLock().lock();
@@ -973,7 +981,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -1106,7 +1114,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
@Nullable Map<Integer, Long> cntrMap,
boolean checkEvictions) {
@@ -1278,7 +1286,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
List<ClusterNode> affNodes = aff.get(p);
if (!affNodes.contains(cctx.localNode())) {
- Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING));
+ List<ClusterNode> nodes = nodes(p, topVer, OWNING);
+ Collection<UUID> nodeIds = F.nodeIds(nodes);
// If all affinity nodes are owners, then evict partition from local node.
if (nodeIds.containsAll(F.nodeIds(affNodes))) {
@@ -1296,15 +1305,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int affCnt = affNodes.size();
if (ownerCnt > affCnt) {
- List<ClusterNode> sorted = new ArrayList<>(cctx.discovery().nodes(nodeIds));
-
// Sort by node orders in ascending order.
- Collections.sort(sorted, CU.nodeComparator(true));
+ Collections.sort(nodes, CU.nodeComparator(true));
- int diff = sorted.size() - affCnt;
+ int diff = nodes.size() - affCnt;
for (int i = 0; i < diff; i++) {
- ClusterNode n = sorted.get(i);
+ ClusterNode n = nodes.get(i);
if (locId.equals(n.id())) {
part.rent(false);
@@ -1330,17 +1337,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
- * @return Current coordinator node.
- */
- @Nullable private ClusterNode currentCoordinator() {
- ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
-
- assert oldest != null || cctx.kernalContext().clientNode();
-
- return oldest;
- }
-
- /**
* Updates value for single partition.
*
* @param p Partition.
@@ -1350,7 +1346,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
- ClusterNode oldest = currentCoordinator();
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
assert oldest != null || cctx.kernalContext().clientNode();
@@ -1415,7 +1411,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private void removeNode(UUID nodeId) {
assert nodeId != null;
- ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer));
+ ClusterNode oldest = discoCache.oldestAliveServerNode();
assert oldest != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a334fd5..46fb144 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,7 +44,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -101,6 +101,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Dummy reassign flag. */
private final boolean reassign;
+ /** */
+ @GridToStringExclude
+ private volatile DiscoCache discoCache;
+
/** Discovery event. */
private volatile DiscoveryEvent discoEvt;
@@ -145,9 +149,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** */
private boolean init;
- /** Topology snapshot. */
- private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
-
/** Last committed cache version before next topology version use. */
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
@@ -335,6 +336,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @return Discovery cache.
+ */
+ public DiscoCache discoCache() {
+ return discoCache;
+ }
+
+ /**
* @param cacheId Cache ID to check.
* @param topVer Topology version.
* @return {@code True} if cache was added during this exchange.
@@ -377,11 +385,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
+ * @param discoCache Discovery data cache.
*/
- public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt) {
+ public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) {
assert exchId.equals(this.exchId);
this.discoEvt = discoEvt;
+ this.discoCache = discoCache;
evtLatch.countDown();
}
@@ -445,7 +455,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert !dummy && !forcePreload : this;
try {
- srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
+ discoCache.updateAlives(cctx.discovery());
+
+ srvNodes = new ArrayList<>(discoCache.serverNodes());
remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@ -560,7 +572,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
if (updateTop && clientTop != null)
- cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
+ top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
}
top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
@@ -852,7 +864,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
List<String> cachesWithoutNodes = null;
for (String name : cctx.cache().cacheNames()) {
- if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
+ if (discoCache.cacheAffinityNodes(name).isEmpty()) {
if (cachesWithoutNodes == null)
cachesWithoutNodes = new ArrayList<>();
@@ -1106,7 +1118,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* Cleans up resources to avoid excessive memory usage.
*/
public void cleanUp() {
- topSnapshot.set(null);
singleMsgs.clear();
fullMsgs.clear();
crd = null;
@@ -1261,7 +1272,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
try {
assert crd.isLocal();
- if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
+ if (!crd.equals(discoCache.serverNodes().get(0))) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal())
cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff);
@@ -1570,6 +1581,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
ClusterNode crd0;
+ discoCache.updateAlives(node);
+
synchronized (mux) {
if (!srvNodes.remove(node))
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d26242d..99146aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -48,7 +48,6 @@ import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridClosureCallMode;
import org.apache.ignite.internal.GridKernalContext;
@@ -58,8 +57,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -167,7 +167,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
private IgniteInternalCache<Object, Object> cache;
/** Topology listener. */
- private GridLocalEventListener topLsnr = new TopologyListener();
+ private DiscoveryEventListener topLsnr = new TopologyListener();
static {
Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() {
@@ -251,7 +251,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
cache = ctx.cache().utilityCache();
if (!ctx.clientNode())
- ctx.event().addLocalEventListener(topLsnr, EVTS);
+ ctx.event().addDiscoveryEventListener(topLsnr, EVTS);
try {
if (ctx.deploy().enabled())
@@ -314,7 +314,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
busyLock.block();
if (!ctx.clientNode())
- ctx.event().removeLocalEventListener(topLsnr);
+ ctx.event().removeDiscoveryEventListener(topLsnr);
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
@@ -1568,9 +1568,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/**
* Topology listener.
*/
- private class TopologyListener implements GridLocalEventListener {
+ private class TopologyListener implements DiscoveryEventListener {
/** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
+ @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) {
if (!busyLock.enterBusy())
return;
@@ -1588,11 +1588,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
else
- topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+ topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
depExe.execute(new BusyRunnable() {
@Override public void run0() {
- ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+ // In case the cache instance isn't tracked by DiscoveryManager anymore.
+ discoCache.updateAlives(ctx.discovery());
+
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();