You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/18 15:04:12 UTC

[13/46] ignite git commit: Merge branch ignite-1.8.4-p1 into ignite-1.8.5-p1

Merge branch ignite-1.8.4-p1 into ignite-1.8.5-p1

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4fce2805
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4fce2805
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4fce2805

Branch: refs/heads/ignite-1561-1
Commit: 4fce28054bc325741f65035ae384f9b4b9c3fee8
Parents: 62dbba8 8273e67
Author: Alexander Fedotov <al...@gmail.com>
Authored: Fri Apr 7 16:06:34 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Fri Apr 7 16:06:34 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 310 ++++++++
 .../discovery/GridDiscoveryManager.java         | 710 ++++++-------------
 .../eventstorage/DiscoveryEventListener.java    |  33 +
 .../eventstorage/GridEventStorageManager.java   | 162 ++++-
 .../affinity/GridAffinityAssignmentCache.java   |   7 +-
 .../cache/CacheAffinitySharedManager.java       |  35 +-
 .../cache/GridCacheAffinityManager.java         |   2 +-
 .../GridCachePartitionExchangeManager.java      |  64 +-
 .../dht/GridClientPartitionTopology.java        |  20 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |   7 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  44 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +-
 .../GridDhtPartitionsExchangeFuture.java        |  33 +-
 .../service/GridServiceProcessor.java           |  21 +-
 .../GridDiscoveryManagerAliveCacheSelfTest.java |  64 +-
 .../GridDiscoveryManagerAttributesSelfTest.java |  14 +-
 .../discovery/GridDiscoveryManagerSelfTest.java | 214 ------
 .../IgniteTopologyPrintFormatSelfTest.java      |   8 +-
 .../testsuites/IgniteKernalSelfTestSuite.java   |   3 -
 19 files changed, 854 insertions(+), 901 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 2ec1070,80549dc..53e6069
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -112,8 -108,8 +108,9 @@@ import org.apache.ignite.spi.discovery.
  import org.apache.ignite.spi.discovery.DiscoverySpiListener;
  import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
  import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
  import org.apache.ignite.thread.IgniteThread;
+ import org.jetbrains.annotations.NotNull;
  import org.jetbrains.annotations.Nullable;
  import org.jsr166.ConcurrentHashMap8;
  
@@@ -1892,29 -1901,114 +1902,137 @@@ public class GridDiscoveryManager exten
      }
  
      /**
 +     * @return {@code True} if local node client and discovery SPI supports reconnect.
 +     */
 +    public boolean reconnectSupported() {
 +        DiscoverySpi spi = getSpi();
 +
 +        return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
 +            !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
 +    }
 +
 +    /**
 +     * Leave cluster and try to join again.
 +     *
 +     * @throws IgniteSpiException If failed.
 +     */
 +    public void reconnect() {
 +        assert reconnectSupported();
 +
 +        DiscoverySpi discoverySpi = getSpi();
 +
 +        ((TcpDiscoverySpi)discoverySpi).reconnect();
 +    }
 +
 +    /**
+      * @param loc Local node.
+      * @param topSnapshot Topology snapshot.
+      * @return Newly created discovery cache.
+      */
+     @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
+         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
+         HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
+ 
+         ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
+         ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
+         ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
+         ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+ 
+         for (ClusterNode node : topSnapshot) {
+             if (alive(node))
+                 alives.add(node.id());
+ 
+             if (node.isDaemon())
+                 daemonNodes.add(node);
+             else {
+                 allNodes.add(node);
+ 
+                 if (!node.isLocal())
+                     rmtNodes.add(node);
+ 
+                 if (!CU.clientNode(node))
+                     srvNodes.add(node);
+             }
+ 
+             nodeMap.put(node.id(), node);
+         }
+ 
+         assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
+             " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
+ 
+         Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
+         Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
+ 
+         Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+         Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+         Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ 
+         Set<Integer> nearEnabledCaches = new HashSet<>();
+ 
+         for (ClusterNode node : allNodes) {
+             assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
+             assert !node.isDaemon();
+ 
+             for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+                 String cacheName = entry.getKey();
+                 CachePredicate filter = entry.getValue();
+ 
+                 if (filter.cacheNode(node)) {
+                     allNodesWithCaches.add(node);
+ 
+                     if(!CU.clientNode(node))
+                         srvNodesWithCaches.add(node);
+ 
+                     if (!node.isLocal())
+                         rmtNodesWithCaches.add(node);
+ 
+                     addToMap(allCacheNodes, cacheName, node);
+ 
+                     if (filter.dataNode(node))
+                         addToMap(affCacheNodes, cacheName, node);
+ 
+                     if (filter.nearNode(node))
+                         nearEnabledCaches.add(CU.cacheId(cacheName));
+                 }
+             }
+         }
+ 
+         return new DiscoCache(
+             loc,
+             Collections.unmodifiableList(rmtNodes),
+             Collections.unmodifiableList(allNodes),
+             Collections.unmodifiableList(srvNodes),
+             Collections.unmodifiableList(daemonNodes),
+             U.sealList(srvNodesWithCaches),
+             U.sealList(allNodesWithCaches),
+             U.sealList(rmtNodesWithCaches),
+             Collections.unmodifiableMap(allCacheNodes),
+             Collections.unmodifiableMap(affCacheNodes),
+             Collections.unmodifiableMap(nodeMap),
+             Collections.unmodifiableSet(nearEnabledCaches),
+             alives);
+     }
+ 
+     /**
+      * Adds node to map.
+      *
+      * @param cacheMap Map to add to.
+      * @param cacheName Cache name.
+      * @param rich Node to add
+      */
+     private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+         List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
+ 
+         if (cacheNodes == null) {
+             cacheNodes = new ArrayList<>();
+ 
+             cacheMap.put(CU.cacheId(cacheName), cacheNodes);
+         }
+ 
+         cacheNodes.add(rich);
+     }
+ 
+     /**
       * Updates topology version if current version is smaller than updated.
       *
       * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6ced5e6,99146aa..adfbc11
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -315,10 -313,8 +315,10 @@@ public class GridServiceProcessor exten
  
          busyLock.block();
  
 +        U.shutdownNow(GridServiceProcessor.class, depExe, log);
 +
          if (!ctx.clientNode())
-             ctx.event().removeLocalEventListener(topLsnr);
+             ctx.event().removeDiscoveryEventListener(topLsnr);
  
          Collection<ServiceContextImpl> ctxs = new ArrayList<>();
  
@@@ -1576,19 -1586,16 +1576,22 @@@
                          if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
                              return;
                      }
 +                    else if (msg instanceof DynamicCacheChangeBatch) {
 +                        if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
 +                            return;
 +                    }
 +                    else
 +                        return;
                  }
                  else
-                     topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+                     topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
  
 -                depExe.execute(new BusyRunnable() {
 +                depExe.execute(new DepRunnable() {
                      @Override public void run0() {
-                         ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+                         // In case the cache instance isn't tracked by DiscoveryManager anymore.
+                         discoCache.updateAlives(ctx.discovery());
+ 
+                         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
  
                          if (oldest != null && oldest.isLocal()) {
                              final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------