You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/03/22 13:43:05 UTC
[3/3] ignite git commit: Merge branch 'ignite-1.7.9-p1' into
apache-master
Merge branch 'ignite-1.7.9-p1' into apache-master
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
# modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
# modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16153bb8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16153bb8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16153bb8
Branch: refs/heads/apache-master
Commit: 16153bb8bae9153ae979c13a93e60daab240d5ee
Parents: 117e18e d124004
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Mar 22 16:36:27 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Wed Mar 22 16:36:27 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalGatewayImpl.java | 8 +-
.../apache/ignite/internal/IgniteKernal.java | 120 +++++-
.../internal/IgniteNeedReconnectException.java | 40 ++
.../discovery/GridDiscoveryManager.java | 24 ++
.../GridCachePartitionExchangeManager.java | 25 +-
.../dht/GridDhtAssignmentFetchFuture.java | 13 +-
.../GridDhtPartitionsExchangeFuture.java | 46 ++-
.../service/GridServiceProcessor.java | 101 ++---
.../ignite/spi/discovery/tcp/ClientImpl.java | 203 ++++++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 5 +
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 8 +
.../spi/discovery/tcp/TcpDiscoverySpi.java | 9 +
.../IgniteClientReconnectCacheTest.java | 7 +-
.../ignite/internal/IgniteClientRejoinTest.java | 378 +++++++++++++++++++
.../GridServiceProcessorStopSelfTest.java | 75 ++++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 48 ++-
.../IgniteClientReconnectTestSuite.java | 2 +
17 files changed, 1013 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e,25f7884..0ea6ea4
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@@ -3620,25 -3576,45 +3676,65 @@@ public class IgniteKernal implements Ig
}
/**
+ * @param node Node.
+ * @param payload Message payload.
+ * @param procFromNioThread If {@code true} message is processed from NIO thread.
+ * @return Response future.
+ */
+ public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
+ return ctx.io().sendIoTest(node, payload, procFromNioThread);
+ }
+
+ /**
+ * @param nodes Nodes.
+ * @param payload Message payload.
+ * @param procFromNioThread If {@code true} message is processed from NIO thread.
+ * @return Response future.
+ */
+ public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
+ return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
+ }
+
++ /**
+ *
+ */
+ private class ReconnectState {
+ /** */
+ private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter();
+
+ /** */
+ private GridCompoundFuture<?, Object> curReconnectFut;
+
+ /** */
+ private GridFutureAdapter<?> reconnectDone;
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ void waitFirstReconnect() throws IgniteCheckedException {
+ firstReconnectFut.get();
+ }
+
+ /**
+ *
+ */
+ void waitPreviousReconnect() {
+ if (curReconnectFut != null && !curReconnectFut.isDone()) {
+ assert reconnectDone != null;
+
+ curReconnectFut.onDone(STOP_RECONNECT);
+
+ try {
+ reconnectDone.get();
+ }
+ catch (IgniteCheckedException ignote) {
+ // No-op.
+ }
+ }
+
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteKernal.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d637de4,2ec1070..b2c4ced
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -108,8 -112,8 +108,9 @@@ import org.apache.ignite.spi.discovery.
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@@ -1903,114 -1892,29 +1904,137 @@@ public class GridDiscoveryManager exten
}
/**
+ * @return {@code True} if local node client and discovery SPI supports reconnect.
+ */
+ public boolean reconnectSupported() {
+ DiscoverySpi spi = getSpi();
+
+ return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+ !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+ }
+
+ /**
+ * Leave cluster and try to join again.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public void reconnect() {
+ assert reconnectSupported();
+
+ DiscoverySpi discoverySpi = getSpi();
+
+ ((TcpDiscoverySpi)discoverySpi).reconnect();
+ }
+
+ /**
+ * @param loc Local node.
+ * @param topSnapshot Topology snapshot.
+ * @return Newly created discovery cache.
+ */
+ @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
+ HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
+ HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
+
+ ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
+ ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+
+ for (ClusterNode node : topSnapshot) {
+ if (alive(node))
+ alives.add(node.id());
+
+ if (node.isDaemon())
+ daemonNodes.add(node);
+ else {
+ allNodes.add(node);
+
+ if (!node.isLocal())
+ rmtNodes.add(node);
+
+ if (!CU.clientNode(node))
+ srvNodes.add(node);
+ }
+
+ nodeMap.put(node.id(), node);
+ }
+
+ assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
+ " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
+
+ Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
+ Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
+
+ Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+ Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+
+ Set<Integer> nearEnabledCaches = new HashSet<>();
+
+ for (ClusterNode node : allNodes) {
+ assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
+ assert !node.isDaemon();
+
+ for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+ String cacheName = entry.getKey();
+ CachePredicate filter = entry.getValue();
+
+ if (filter.cacheNode(node)) {
+ allNodesWithCaches.add(node);
+
+ if(!CU.clientNode(node))
+ srvNodesWithCaches.add(node);
+
+ if (!node.isLocal())
+ rmtNodesWithCaches.add(node);
+
+ addToMap(allCacheNodes, cacheName, node);
+
+ if (filter.dataNode(node))
+ addToMap(affCacheNodes, cacheName, node);
+
+ if (filter.nearNode(node))
+ nearEnabledCaches.add(CU.cacheId(cacheName));
+ }
+ }
+ }
+
+ return new DiscoCache(
+ loc,
+ Collections.unmodifiableList(rmtNodes),
+ Collections.unmodifiableList(allNodes),
+ Collections.unmodifiableList(srvNodes),
+ Collections.unmodifiableList(daemonNodes),
+ U.sealList(srvNodesWithCaches),
+ U.sealList(allNodesWithCaches),
+ U.sealList(rmtNodesWithCaches),
+ Collections.unmodifiableMap(allCacheNodes),
+ Collections.unmodifiableMap(affCacheNodes),
+ Collections.unmodifiableMap(nodeMap),
+ Collections.unmodifiableSet(nearEnabledCaches),
+ alives);
+ }
+
+ /**
+ * Adds node to map.
+ *
+ * @param cacheMap Map to add to.
+ * @param cacheName Cache name.
+ * @param rich Node to add
+ */
+ private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+ List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
+
+ if (cacheNodes == null) {
+ cacheNodes = new ArrayList<>();
+
+ cacheMap.put(CU.cacheId(cacheName), cacheNodes);
+ }
+
+ cacheNodes.add(rich);
+ }
+
+ /**
* Updates topology version if current version is smaller than updated.
*
* @param updated Updated topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 50937a8,d4f95e5..5eacc36
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1677,12 -1677,16 +1704,21 @@@ public class GridDhtPartitionsExchangeF
}
}
+ /**
+ * @param e Exception.
+ * @return {@code True} if local node should try reconnect in case of error.
+ */
+ public boolean reconnectOnError(Throwable e) {
+ return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) &&
+ cctx.discovery().reconnectSupported();
+ }
+
/** {@inheritDoc} */
+ @Override public boolean isExchange() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
return exchId.compareTo(fut.exchId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 84fb8e3,bd81518..e0a5c7c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -317,8 -315,10 +317,10 @@@ public class GridServiceProcessor exten
busyLock.block();
+ U.shutdownNow(GridServiceProcessor.class, depExe, log);
+
if (!ctx.clientNode())
- ctx.event().removeLocalEventListener(topLsnr);
+ ctx.event().removeDiscoveryEventListener(topLsnr);
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
@@@ -1401,7 -1399,7 +1401,7 @@@
return;
try {
- depExe.execute(new BusyRunnable() {
- depExe.submit(new DepRunnable() {
++ depExe.execute(new DepRunnable() {
@Override public void run0() {
onSystemCacheUpdated(deps);
}
@@@ -1582,18 -1582,13 +1582,18 @@@
}
else
return;
+
+ topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
}
else
- topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+ topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
- depExe.execute(new BusyRunnable() {
- depExe.submit(new DepRunnable() {
++ depExe.execute(new DepRunnable() {
@Override public void run0() {
- ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+ // In case the cache instance isn't tracked by DiscoveryManager anymore.
+ discoCache.updateAlives(ctx.discovery());
+
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 04b076d,02ba56a..feb3e48
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@@ -1100,6 -1183,15 +1186,16 @@@ class ClientImpl extends TcpDiscoveryIm
continue;
}
+ else {
- msg = queue.poll();
++ if (msg == null)
++ msg = queue.poll();
+
+ if (msg == null) {
+ mux.wait();
+
+ continue;
+ }
+ }
}
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 682d2d7,6cdf465..01aa256
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@@ -698,11 -698,14 +698,14 @@@ public class IgniteClientReconnectCache
IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try {
- Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT))));
+ Ignition.start(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT))));
- fail();
+ // Commented due to IGNITE-4473, because
+ // IgniteClientDisconnectedException won't
+ // be thrown, but client will reconnect.
+ // fail();
- return false;
+ return true;
}
catch (IgniteClientDisconnectedException e) {
log.info("Expected start error: " + e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------