You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/18 15:04:12 UTC
[13/46] ignite git commit: Merge branch ignite-1.8.4-p1 into
ignite-1.8.5-p1
Merge branch ignite-1.8.4-p1 into ignite-1.8.5-p1
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4fce2805
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4fce2805
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4fce2805
Branch: refs/heads/ignite-1561-1
Commit: 4fce28054bc325741f65035ae384f9b4b9c3fee8
Parents: 62dbba8 8273e67
Author: Alexander Fedotov <al...@gmail.com>
Authored: Fri Apr 7 16:06:34 2017 +0300
Committer: Alexander Fedotov <al...@gmail.com>
Committed: Fri Apr 7 16:06:34 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 310 ++++++++
.../discovery/GridDiscoveryManager.java | 710 ++++++-------------
.../eventstorage/DiscoveryEventListener.java | 33 +
.../eventstorage/GridEventStorageManager.java | 162 ++++-
.../affinity/GridAffinityAssignmentCache.java | 7 +-
.../cache/CacheAffinitySharedManager.java | 35 +-
.../cache/GridCacheAffinityManager.java | 2 +-
.../GridCachePartitionExchangeManager.java | 64 +-
.../dht/GridClientPartitionTopology.java | 20 +-
.../dht/GridDhtAssignmentFetchFuture.java | 7 +-
.../dht/GridDhtPartitionTopologyImpl.java | 44 +-
.../dht/atomic/GridDhtAtomicCache.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 33 +-
.../service/GridServiceProcessor.java | 21 +-
.../GridDiscoveryManagerAliveCacheSelfTest.java | 64 +-
.../GridDiscoveryManagerAttributesSelfTest.java | 14 +-
.../discovery/GridDiscoveryManagerSelfTest.java | 214 ------
.../IgniteTopologyPrintFormatSelfTest.java | 8 +-
.../testsuites/IgniteKernalSelfTestSuite.java | 3 -
19 files changed, 854 insertions(+), 901 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 2ec1070,80549dc..53e6069
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -112,8 -108,8 +108,9 @@@ import org.apache.ignite.spi.discovery.
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
+ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@@ -1892,29 -1901,114 +1902,137 @@@ public class GridDiscoveryManager exten
}
/**
+ * @return {@code True} if local node client and discovery SPI supports reconnect.
+ */
+ public boolean reconnectSupported() {
+ DiscoverySpi spi = getSpi();
+
+ return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+ !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+ }
+
+ /**
+ * Leave cluster and try to join again.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public void reconnect() {
+ assert reconnectSupported();
+
+ DiscoverySpi discoverySpi = getSpi();
+
+ ((TcpDiscoverySpi)discoverySpi).reconnect();
+ }
+
+ /**
+ * @param loc Local node.
+ * @param topSnapshot Topology snapshot.
+ * @return Newly created discovery cache.
+ */
+ @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
+ HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
+ HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
+
+ ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+
+ for (ClusterNode node : topSnapshot) {
+ if (alive(node))
+ alives.add(node.id());
+
+ if (node.isDaemon())
+ daemonNodes.add(node);
+ else {
+ allNodes.add(node);
+
+ if (!node.isLocal())
+ rmtNodes.add(node);
+
+ if (!CU.clientNode(node))
+ srvNodes.add(node);
+ }
+
+ nodeMap.put(node.id(), node);
+ }
+
+ assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
+ " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
+
+ Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
+ Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
+
+ Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+
+ Set<Integer> nearEnabledCaches = new HashSet<>();
+
+ for (ClusterNode node : allNodes) {
+ assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
+ assert !node.isDaemon();
+
+ for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+ String cacheName = entry.getKey();
+ CachePredicate filter = entry.getValue();
+
+ if (filter.cacheNode(node)) {
+ allNodesWithCaches.add(node);
+
+ if(!CU.clientNode(node))
+ srvNodesWithCaches.add(node);
+
+ if (!node.isLocal())
+ rmtNodesWithCaches.add(node);
+
+ addToMap(allCacheNodes, cacheName, node);
+
+ if (filter.dataNode(node))
+ addToMap(affCacheNodes, cacheName, node);
+
+ if (filter.nearNode(node))
+ nearEnabledCaches.add(CU.cacheId(cacheName));
+ }
+ }
+ }
+
+ return new DiscoCache(
+ loc,
+ Collections.unmodifiableList(rmtNodes),
+ Collections.unmodifiableList(allNodes),
+ Collections.unmodifiableList(srvNodes),
+ Collections.unmodifiableList(daemonNodes),
+ U.sealList(srvNodesWithCaches),
+ U.sealList(allNodesWithCaches),
+ U.sealList(rmtNodesWithCaches),
+ Collections.unmodifiableMap(allCacheNodes),
+ Collections.unmodifiableMap(affCacheNodes),
+ Collections.unmodifiableMap(nodeMap),
+ Collections.unmodifiableSet(nearEnabledCaches),
+ alives);
+ }
+
+ /**
+ * Adds node to map.
+ *
+ * @param cacheMap Map to add to.
+ * @param cacheName Cache name.
+ * @param rich Node to add
+ */
+ private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+ List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
+
+ if (cacheNodes == null) {
+ cacheNodes = new ArrayList<>();
+
+ cacheMap.put(CU.cacheId(cacheName), cacheNodes);
+ }
+
+ cacheNodes.add(rich);
+ }
+
+ /**
* Updates topology version if current version is smaller than updated.
*
* @param updated Updated topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6ced5e6,99146aa..adfbc11
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -315,10 -313,8 +315,10 @@@ public class GridServiceProcessor exten
busyLock.block();
+ U.shutdownNow(GridServiceProcessor.class, depExe, log);
+
if (!ctx.clientNode())
- ctx.event().removeLocalEventListener(topLsnr);
+ ctx.event().removeDiscoveryEventListener(topLsnr);
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
@@@ -1576,19 -1586,16 +1576,22 @@@
if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
return;
}
+ else if (msg instanceof DynamicCacheChangeBatch) {
+ if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
+ return;
+ }
+ else
+ return;
}
else
- topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+ topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
- depExe.execute(new BusyRunnable() {
+ depExe.execute(new DepRunnable() {
@Override public void run0() {
- ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+ // In case the cache instance isn't tracked by DiscoveryManager anymore.
+ discoCache.updateAlives(ctx.discovery());
+
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4fce2805/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------