You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/13 08:45:57 UTC

[36/50] [abbrv] ignite git commit: ignite-4779 Missed discovery data snapshot during exchange processing (do not use discovery manager cache to handle exchange)

http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index b5cb5cf..56acc26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -71,17 +72,19 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
      * @param ctx Context.
      * @param cacheName Cache name.
      * @param topVer Topology version.
+     * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
         String cacheName,
-        AffinityTopologyVersion topVer
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache
     ) {
         this.ctx = ctx;
         int cacheId = CU.cacheId(cacheName);
         this.key = new T2<>(cacheId, topVer);
 
-        Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheId, topVer);
+        Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheId);
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 966a186..84ff96b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -95,6 +96,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** */
     private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
+    /** Discovery cache. */
+    private volatile DiscoCache discoCache;
+
     /** */
     private volatile boolean stopping;
 
@@ -151,6 +155,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
             topVer = AffinityTopologyVersion.NONE;
+
+            discoCache = cctx.discovery().discoCache();
         }
         finally {
             lock.writeLock().unlock();
@@ -293,6 +299,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
             topVer = exchId.topologyVersion();
+
+            discoCache = exchFut.discoCache();
         }
         finally {
             lock.writeLock().unlock();
@@ -349,7 +357,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
-        ClusterNode oldest = currentCoordinator();
+        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
@@ -474,7 +482,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (exchId.isLeft())
                 removeNode(exchId.nodeId());
 
-            ClusterNode oldest = currentCoordinator();
+            ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -876,7 +884,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         AffinityTopologyVersion topVer,
         GridDhtPartitionState state,
         GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
+        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null;
 
         lock.readLock().lock();
 
@@ -973,7 +981,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -1106,7 +1114,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
         @Nullable Map<Integer, Long> cntrMap,
         boolean checkEvictions) {
@@ -1278,7 +1286,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 List<ClusterNode> affNodes = aff.get(p);
 
                 if (!affNodes.contains(cctx.localNode())) {
-                    Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING));
+                    List<ClusterNode> nodes = nodes(p, topVer, OWNING);
+                    Collection<UUID> nodeIds = F.nodeIds(nodes);
 
                     // If all affinity nodes are owners, then evict partition from local node.
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
@@ -1296,15 +1305,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         int affCnt = affNodes.size();
 
                         if (ownerCnt > affCnt) {
-                            List<ClusterNode> sorted = new ArrayList<>(cctx.discovery().nodes(nodeIds));
-
                             // Sort by node orders in ascending order.
-                            Collections.sort(sorted, CU.nodeComparator(true));
+                            Collections.sort(nodes, CU.nodeComparator(true));
 
-                            int diff = sorted.size() - affCnt;
+                            int diff = nodes.size() - affCnt;
 
                             for (int i = 0; i < diff; i++) {
-                                ClusterNode n = sorted.get(i);
+                                ClusterNode n = nodes.get(i);
 
                                 if (locId.equals(n.id())) {
                                     part.rent(false);
@@ -1330,17 +1337,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /**
-     * @return Current coordinator node.
-     */
-    @Nullable private ClusterNode currentCoordinator() {
-        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
-
-        assert oldest != null || cctx.kernalContext().clientNode();
-
-        return oldest;
-    }
-
-    /**
      * Updates value for single partition.
      *
      * @param p Partition.
@@ -1350,7 +1346,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
-        ClusterNode oldest = currentCoordinator();
+        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
         assert oldest != null || cctx.kernalContext().clientNode();
 
@@ -1415,7 +1411,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private void removeNode(UUID nodeId) {
         assert nodeId != null;
 
-        ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer));
+        ClusterNode oldest = discoCache.oldestAliveServerNode();
 
         assert oldest != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a334fd5..46fb144 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,7 +44,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -101,6 +101,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Dummy reassign flag. */
     private final boolean reassign;
 
+    /** */
+    @GridToStringExclude
+    private volatile DiscoCache discoCache;
+
     /** Discovery event. */
     private volatile DiscoveryEvent discoEvt;
 
@@ -145,9 +149,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** */
     private boolean init;
 
-    /** Topology snapshot. */
-    private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
-
     /** Last committed cache version before next topology version use. */
     private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
 
@@ -335,6 +336,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @return Discovery cache.
+     */
+    public DiscoCache discoCache() {
+        return discoCache;
+    }
+
+    /**
      * @param cacheId Cache ID to check.
      * @param topVer Topology version.
      * @return {@code True} if cache was added during this exchange.
@@ -377,11 +385,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      *
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
+     * @param discoCache Discovery data cache.
      */
-    public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt) {
+    public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) {
         assert exchId.equals(this.exchId);
 
         this.discoEvt = discoEvt;
+        this.discoCache = discoCache;
 
         evtLatch.countDown();
     }
@@ -445,7 +455,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert !dummy && !forcePreload : this;
 
         try {
-            srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
+            discoCache.updateAlives(cctx.discovery());
+
+            srvNodes = new ArrayList<>(discoCache.serverNodes());
 
             remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
 
@@ -560,7 +572,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
                 if (updateTop && clientTop != null)
-                    cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
+                    top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
             }
 
             top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
@@ -852,7 +864,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         List<String> cachesWithoutNodes = null;
 
         for (String name : cctx.cache().cacheNames()) {
-            if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
+            if (discoCache.cacheAffinityNodes(name).isEmpty()) {
                 if (cachesWithoutNodes == null)
                     cachesWithoutNodes = new ArrayList<>();
 
@@ -1106,7 +1118,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * Cleans up resources to avoid excessive memory usage.
      */
     public void cleanUp() {
-        topSnapshot.set(null);
         singleMsgs.clear();
         fullMsgs.clear();
         crd = null;
@@ -1261,7 +1272,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         try {
             assert crd.isLocal();
 
-            if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
+            if (!crd.equals(discoCache.serverNodes().get(0))) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (!cacheCtx.isLocal())
                         cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff);
@@ -1570,6 +1581,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                         ClusterNode crd0;
 
+                        discoCache.updateAlives(node);
+
                         synchronized (mux) {
                             if (!srvNodes.remove(node))
                                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a61a98ad/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d26242d..99146aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -48,7 +48,6 @@ import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridClosureCallMode;
 import org.apache.ignite.internal.GridKernalContext;
@@ -58,8 +57,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -167,7 +167,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     private IgniteInternalCache<Object, Object> cache;
 
     /** Topology listener. */
-    private GridLocalEventListener topLsnr = new TopologyListener();
+    private DiscoveryEventListener topLsnr = new TopologyListener();
 
     static {
         Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() {
@@ -251,7 +251,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         cache = ctx.cache().utilityCache();
 
         if (!ctx.clientNode())
-            ctx.event().addLocalEventListener(topLsnr, EVTS);
+            ctx.event().addDiscoveryEventListener(topLsnr, EVTS);
 
         try {
             if (ctx.deploy().enabled())
@@ -314,7 +314,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         busyLock.block();
 
         if (!ctx.clientNode())
-            ctx.event().removeLocalEventListener(topLsnr);
+            ctx.event().removeDiscoveryEventListener(topLsnr);
 
         Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
@@ -1568,9 +1568,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      * Topology listener.
      */
-    private class TopologyListener implements GridLocalEventListener {
+    private class TopologyListener implements DiscoveryEventListener {
         /** {@inheritDoc} */
-        @Override public void onEvent(Event evt) {
+        @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) {
             if (!busyLock.enterBusy())
                 return;
 
@@ -1588,11 +1588,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                     }
                 }
                 else
-                    topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+                    topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
 
                 depExe.execute(new BusyRunnable() {
                     @Override public void run0() {
-                        ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+                        // In case the cache instance isn't tracked by DiscoveryManager anymore.
+                        discoCache.updateAlives(ctx.discovery());
+
+                        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
                         if (oldest != null && oldest.isLocal()) {
                             final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();