You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/09 17:15:59 UTC

[50/50] incubator-ignite git commit: # ignite-901

# ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fa007b1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fa007b1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fa007b1f

Branch: refs/heads/ignite-901
Commit: fa007b1f5e6d9a60827eef0d979bbd2959fb7cc3
Parents: e73e496
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 9 09:59:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 9 17:53:41 2015 +0300

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |   6 +-
 .../discovery/GridDiscoveryManager.java         |  41 ++--
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../cache/GridCacheSharedContext.java           |  24 +-
 .../GridDhtPartitionsExchangeFuture.java        |   4 -
 .../java/org/apache/ignite/spi/IgniteSpi.java   |  15 ++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  13 +
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  77 ++++--
 .../tcp/internal/TcpDiscoveryNode.java          |  15 +-
 .../GridDeploymentManagerStopSelfTest.java      |  11 +
 .../loadtests/hashmap/GridCacheTestContext.java |   4 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 236 ++++++++++++++++++-
 .../query/h2/twostep/GridMergeIndex.java        |  29 +--
 .../h2/twostep/GridReduceQueryExecutor.java     |   3 +
 14 files changed, 401 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 9faa056..298ff24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -168,12 +168,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
 
     /** {@inheritDoc} */
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
-        // No-op.
+        for (T t : spis)
+            t.onClientDisconnected(reconnectFut);
     }
 
     /** {@inheritDoc} */
     @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
-        // No-op.
+        for (T t : spis)
+            t.onClientReconnected(clusterRestarted);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index c0d9f13..986a995 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -491,10 +491,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     assert node.isClient() : node;
 
                     ((IgniteKernal)ctx.grid()).onDisconnected();
-
-                    recordEvent(type, topVer, node, topSnapshot);
-
-                    return;
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -506,16 +502,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
 
-                    recordEvent(type, topVer, node, topSnapshot);
-
                     ctx.gateway().onReconnected();
-
-                    if (log.isInfoEnabled())
-                        log.info("Client node reconnected to cluster: " + node);
-
-                    ackTopology(topVer, true);
-
-                    return;
                 }
 
                 discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
@@ -989,7 +976,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
         Collection<ClusterNode> rmtNodes = discoCache.remoteNodes();
 
-        Collection<ClusterNode> serverNodes = F.view(discoCache.allNodes(), F.not(clientFilter));
+        Collection<ClusterNode> srvNodes = F.view(discoCache.allNodes(), F.not(clientFilter));
 
         Collection<ClusterNode> clientNodes = F.view(discoCache.allNodes(), clientFilter);
 
@@ -1009,7 +996,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         double heap = U.heapSize(allNodes, 2);
 
         if (log.isQuiet())
-            U.quiet(false, topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap));
+            U.quiet(false, topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
 
         if (log.isDebugEnabled()) {
             String dbg = "";
@@ -1019,7 +1006,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 ">>> " + PREFIX + "." + U.nl() +
                 ">>> +----------------+" + U.nl() +
                 ">>> Grid name: " + (ctx.gridName() == null ? "default" : ctx.gridName()) + U.nl() +
-                ">>> Number of server nodes: " + serverNodes.size() + U.nl() +
+                ">>> Number of server nodes: " + srvNodes.size() + U.nl() +
                 ">>> Number of client nodes: " + clientNodes.size() + U.nl() +
                 (discoOrdered ? ">>> Topology version: " + topVer + U.nl() : "") +
                 ">>> Topology hash: 0x" + Long.toHexString(hash).toUpperCase() + U.nl();
@@ -1053,7 +1040,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             log.debug(dbg);
         }
         else if (log.isInfoEnabled())
-            log.info(topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap));
+            log.info(topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
     }
 
     /**
@@ -1063,10 +1050,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param heap Heap size.
      * @return Topology snapshot message.
      */
-    private String topologySnapshotMessage(int serverNodesNum, int clientNodesNum, int totalCpus, double heap) {
+    private String topologySnapshotMessage(int srvNodesNum, int clientNodesNum, int totalCpus, double heap) {
         return PREFIX + " [" +
             (discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + ", " : "") +
-            "server nodes=" + serverNodesNum +
+            "server nodes=" + srvNodesNum +
             ", client nodes=" + clientNodesNum +
             ", CPUs=" + totalCpus +
             ", heap=" + heap + "GB" +
@@ -1917,6 +1904,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     break;
                 }
 
+                case EVT_CLIENT_NODE_DISCONNECTED: {
+                    if (log.isInfoEnabled())
+                        log.info("Client node disconnected from topology: " + node);
+
+                    break;
+                }
+
+                case EVT_CLIENT_NODE_RECONNECTED: {
+                    if (log.isInfoEnabled())
+                        log.info("Client node reconnected to topology: " + node);
+
+                    ackTopology(topVer.topologyVersion(), true);
+
+                    break;
+                }
+
                 case EVT_NODE_FAILED: {
                     // Check only if resolvers were configured.
                     if (hasRslvrs)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ebbe639..61f7e58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -575,8 +575,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
 
-        sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(
-            ctx, ctx.config().getCacheStoreSessionListenerFactories()));
+        sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
+            ctx.config().getCacheStoreSessionListenerFactories()));
 
         ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
             !ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
@@ -1721,8 +1721,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             depMgr,
             exchMgr,
             ioMgr,
-            storeSesLsnrs,
-            jta
+            jta,
+            storeSesLsnrs
         );
     }
 
@@ -1733,8 +1733,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
+        // Collect dynamically started caches to a single object.
         Collection<DynamicCacheChangeRequest> reqs =
-            // Collect dynamically started caches to a single object.
             new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
 
         boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 715d514..d0064f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -87,9 +87,15 @@ public class GridCacheSharedContext<K, V> {
     private Collection<CacheStoreSessionListener> storeSesLsnrs;
 
     /**
+     * @param kernalCtx  Context.
      * @param txMgr Transaction manager.
      * @param verMgr Version manager.
      * @param mvccMgr MVCC manager.
+     * @param depMgr Deployment manager.
+     * @param exchMgr Exchange manager.
+     * @param ioMgr IO manager.
+     * @param jtaMgr JTA manager.
+     * @param storeSesLsnrs Store session listeners.
      */
     public GridCacheSharedContext(
         GridKernalContext kernalCtx,
@@ -99,12 +105,12 @@ public class GridCacheSharedContext<K, V> {
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
         GridCacheIoManager ioMgr,
-        Collection<CacheStoreSessionListener> storeSesLsnrs,
-        CacheJtaManagerAdapter jtaMgr
+        CacheJtaManagerAdapter jtaMgr,
+        Collection<CacheStoreSessionListener> storeSesLsnrs
     ) {
         this.kernalCtx = kernalCtx;
 
-        setManagers(txMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
+        setManagers(txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
 
         this.storeSesLsnrs = storeSesLsnrs;
 
@@ -114,6 +120,7 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @param reconnectFut Reconnect future.
      * @throws IgniteCheckedException If failed.
      */
     void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
@@ -137,6 +144,7 @@ public class GridCacheSharedContext<K, V> {
         mgrs = new LinkedList<>();
 
         setManagers(txMgr,
+            jtaMgr,
             verMgr,
             mvccMgr,
             new GridCacheDeploymentManager<K, V>(),
@@ -149,7 +157,17 @@ public class GridCacheSharedContext<K, V> {
         }
     }
 
+    /**
+     * @param txMgr Transaction manager.
+     * @param verMgr Version manager.
+     * @param mvccMgr MVCC manager.
+     * @param depMgr Deployment manager.
+     * @param exchMgr Exchange manager.
+     * @param ioMgr IO manager.
+     * @param jtaMgr JTA manager.
+     */
     private void setManagers(IgniteTxManager txMgr,
+        CacheJtaManagerAdapter jtaMgr,
         GridCacheVersionManager verMgr,
         GridCacheMvccManager mvccMgr,
         GridCacheDeploymentManager<K, V> depMgr,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0369eb9..38a0d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -458,8 +458,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (isDone())
             return;
 
-        log.info("Init exchange: " + exchangeId());
-
         if (init.compareAndSet(false, true)) {
             if (isDone())
                 return;
@@ -1026,8 +1024,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         cctx.exchange().onExchangeDone(this, err);
 
         if (super.onDone(res, err) && !dummy && !forcePreload) {
-            log.info("Finished exchange: " + exchangeId() + ", err=" + err);
-
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']');
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
index 968d88d..0f6ed5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi;
 
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -106,4 +107,18 @@ public interface IgniteSpi {
      * @throws IgniteSpiException Thrown in case of any error during SPI stop.
      */
     public void spiStop() throws IgniteSpiException;
+
+    /**
+     * Client node disconnected callback.
+     *
+     * @param reconnectFut Future that will be completed when client reconnected.
+     */
+    public void onClientDisconnected(IgniteFuture<?> reconnectFut);
+
+    /**
+     * Client node reconnected callback.
+     *
+     * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
+     */
+    public void onClientReconnected(boolean clusterRestarted);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index dd19203..a49d85a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.resources.*;
@@ -191,8 +192,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClientReconnected(boolean clusterRestarted) {
+        // No-op.
+    }
+
     /**
      * Inject ignite instance.
+     *
+     * @param ignite Ignite instance.
      */
     @IgniteInstanceResource
     protected void injectResources(Ignite ignite) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index b3793b1..8041a63 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -512,9 +512,18 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 tstamp = U.currentTimeMillis();
 
-                TcpDiscoveryAbstractMessage msg = recon ?
-                    new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) :
-                    new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+                TcpDiscoveryAbstractMessage msg;
+
+                if (!recon) {
+                    TcpDiscoveryNode node = locNode;
+
+                    if (locNode.order() > 0)
+                        node = locNode.clientReconnectNode();
+
+                    msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
+                }
+                else
+                    msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
 
                 msg.client(true);
 
@@ -977,6 +986,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             long timeout = join ? spi.joinTimeout : spi.netTimeout;
 
+            log.info("Will try to reconnect with timeout: " + timeout);
+
             long startTime = U.currentTimeMillis();
 
             try {
@@ -1020,6 +1031,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                                 TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
 
                                 if (res.creatorNodeId().equals(getLocalNodeId())) {
+                                    log.info("Reconnect status: " + res.success());
+
                                     if (res.success()) {
                                         msgWorker.addMessage(res);
 
@@ -1032,7 +1045,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                                         return;
                                     }
-                                    else // TODO IGNITE-901 reuse socket.
+                                    else
                                         return;
                                 }
                             }
@@ -1136,6 +1149,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                             break;
                         }
                         else if (state == ClientImpl.State.DISCONNECTED) {
+                            log.info("Rejoin timeout, will segment.");
+
                             state = ClientImpl.State.SEGMENTED;
 
                             notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
@@ -1172,6 +1187,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                                 }
                             }
                             else {
+                                log.info("Socket closed, will try to reconnect.");
+
                                 assert reconnector == null;
 
                                 final Reconnector reconnector = new Reconnector(join);
@@ -1187,30 +1204,40 @@ class ClientImpl extends TcpDiscoveryImpl {
                         reconnector = null;
 
                         if (spi.isClientReconnectDisabled()) {
-                            state = ClientImpl.State.SEGMENTED;
+                            if (state != ClientImpl.State.SEGMENTED && state != ClientImpl.State.STOPPED) {
+                                log.info("Reconnected failed, will segment.");
 
-                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                                state = ClientImpl.State.SEGMENTED;
+
+                                notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                            }
                         }
                         else {
-                            state = ClientImpl.State.DISCONNECTED;
+                            log.info("Reconnected failed, will try join.");
 
-                            nodeAdded = false;
+                            if (state != ClientImpl.State.DISCONNECTED) {
+                                state = ClientImpl.State.DISCONNECTED;
 
-                            IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
-                                null, "Failed to ping node, client node disconnected.");
+                                nodeAdded = false;
 
-                            for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
-                                GridFutureAdapter<Boolean> fut = e.getValue();
+                                IgniteClientDisconnectedCheckedException err =
+                                    new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, client node disconnected.");
 
-                                if (pingFuts.remove(e.getKey(), fut))
-                                    fut.onDone(err);
-                            }
+                                for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+                                    GridFutureAdapter<Boolean> fut = e.getValue();
 
-                            notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+                                    if (pingFuts.remove(e.getKey(), fut))
+                                        fut.onDone(err);
+                                }
+
+                                notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+                            }
 
                             UUID newId = UUID.randomUUID();
 
-                            log.info("Change node id: " + newId + " " + locNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME));
+                            if (log.isInfoEnabled())
+                                log.info("Client will try to reconnect to cluster with new id " +
+                                    "[id=" + newId + ", prevId=" + locNode.id() + ']');
 
                             locNode.onClientDisconnected(newId);
 
@@ -1220,7 +1247,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     else {
                         TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
 
-                        if (joinLatch.getCount() > 0) { // TODO IGNITE-901.
+                        if (joining()) {
                             IgniteSpiException err = null;
 
                             if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
@@ -1231,7 +1258,13 @@ class ClientImpl extends TcpDiscoveryImpl {
                                 err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
 
                             if (err != null) {
-                                joinError(err);
+                                if (state == ClientImpl.State.DISCONNECTED) {
+                                    state = ClientImpl.State.SEGMENTED;
+
+                                    notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                                }
+                                else
+                                    joinError(err);
 
                                 break;
                             }
@@ -1263,12 +1296,16 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             boolean join = state == ClientImpl.State.STARTING;
 
+            log.info("Try join topology with timeout: " + spi.joinTimeout);
+
             final Socket sock = joinTopology(false, spi.joinTimeout);
 
             if (sock == null) {
                 if (join)
                     joinError(new IgniteSpiException("Join process timed out."));
                 else {
+                    log.info("Send join request on rejoin failed, will segment.");
+
                     state = ClientImpl.State.SEGMENTED;
 
                     notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
@@ -1284,7 +1321,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             if (spi.joinTimeout > 0) {
                 timer.schedule(new TimerTask() {
                     @Override public void run() {
-                        if (joinLatch.getCount() > 0)
+                        if (joining())
                             queue.add(JOIN_TIMEOUT);
                     }
                 }, spi.joinTimeout);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 1e849f4..b8f1a86 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -449,9 +449,18 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
      */
     public void onClientDisconnected(UUID newId) {
         id = newId;
-        order = 0;
-        intOrder = 0;
-        visible = false;
+    }
+
+    /**
+     * @return Copy of local node for client reconnect request.
+     */
+    public TcpDiscoveryNode clientReconnectNode() {
+        TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver);
+
+        node.attrs = attrs;
+        node.clientRouterNodeId = clientRouterNodeId;
+
+        return node;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
index 9780080..b8f9ce1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.deployment;
 
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.resource.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.jdk.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.deployment.*;
@@ -95,5 +96,15 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public boolean unregister(String rsrcName) { return false; }
+
+        /** {@inheritDoc} */
+        @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onClientReconnected(boolean clusterRestarted) {
+            // No-op.
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index e9d7a45..9a883b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -55,8 +55,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
                 new GridCacheDeploymentManager<K, V>(),
                 new GridCachePartitionExchangeManager<K, V>(),
                 new GridCacheIoManager(),
-                null,
-                new CacheNoopJtaManager()
+                new CacheNoopJtaManager(),
+                null
             ),
             defaultCacheConfiguration(),
             CacheType.USER,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ba38dfc..5838481 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -111,6 +111,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /** */
     private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
 
+    /** */
+    private boolean reconnectDisabled;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -159,6 +162,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         disco.setJoinTimeout(joinTimeout);
         disco.setNetworkTimeout(netTimeout);
 
+        disco.setClientReconnectDisabled(reconnectDisabled);
+
         disco.afterWrite(afterWrite);
 
         cfg.setDiscoverySpi(disco);
@@ -633,6 +638,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     public void testClientSegmentation() throws Exception {
         clientsPerSrv = 1;
 
+        reconnectDisabled = true;
+
         startServerNodes(3);
         startClientNodes(3);
 
@@ -656,6 +663,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi();
 
         try {
+            log.info("Fail server: " + 2);
+
             failServer(2);
 
             await(srvFailedLatch);
@@ -888,8 +897,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         try {
             startClientNodes(1);
 
-            assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode) G.ignite("client-0")
-                .cluster().localNode()).clientRouterNodeId());
+            assertEquals(G.ignite("server-0").cluster().localNode().id(),
+                ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId());
 
             checkNodes(2, 1);
 
@@ -1278,8 +1287,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         final CountDownLatch disconnectLatch = new CountDownLatch(1);
 
         client.events().localListen(new IgnitePredicate<Event>() {
-            @Override
-            public boolean apply(Event evt) {
+            @Override public boolean apply(Event evt) {
                 info("Client event: " + evt);
 
                 if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
@@ -1334,22 +1342,230 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testClientReconnectDisabled() throws Exception {
-        // TODO IGNTIE-901.
+    public void testClientFailReconnectDisabled() throws Exception {
+        reconnectDisabled = true;
+
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+        Ignite client = G.ignite("client-0");
+
+        final CountDownLatch segmentedLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_NODE_SEGMENTED)
+                    segmentedLatch.countDown();
+
+                return false;
+            }
+        }, EVT_NODE_SEGMENTED);
+
+        srvFailedLatch = new CountDownLatch(1);
+
+        attachListeners(1, 0);
+
+        log.info("Fail client node.");
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(srvFailedLatch.await(5000, MILLISECONDS));
+        assertTrue(segmentedLatch.await(5000, MILLISECONDS));
+
+        checkNodes(1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
+        reconnectSegmentedAfterJoinTimeout(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
+        reconnectSegmentedAfterJoinTimeout(false);
+    }
+
+    /**
+     * @param failSrv If {@code true} fails server, otherwise server does not send join message.
+     * @throws Exception If failed.
+     */
+    private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
+        netTimeout = 4000;
+        joinTimeout = 5000;
+
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+        Ignite client = G.ignite("client-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+        TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch segmentedLatch = new CountDownLatch(1);
+        final AtomicBoolean err = new AtomicBoolean(false);
+
+        if (!failSrv) {
+            srvFailedLatch = new CountDownLatch(1);
+
+            attachListeners(1, 0);
+        }
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected event.");
+
+                    assertEquals(1, segmentedLatch.getCount());
+                    assertEquals(1, disconnectLatch.getCount());
+                    assertFalse(err.get());
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_NODE_SEGMENTED) {
+                    log.info("Segmented event.");
+
+                    assertEquals(1, segmentedLatch.getCount());
+                    assertEquals(0, disconnectLatch.getCount());
+                    assertFalse(err.get());
+
+                    segmentedLatch.countDown();
+                }
+                else {
+                    log.error("Unexpected event: " + evt);
+
+                    err.set(true);
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+        if (failSrv) {
+            log.info("Fail server.");
+
+            failServer(0);
+        }
+        else {
+            log.info("Fail client connection.");
+
+            srvSpi.failClientReconnect.set(1_000_000);
+            srvSpi.failNodeAdded.set(1_000_000);
+
+            clientSpi.brakeConnection();
+        }
+
+        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+        assertTrue(segmentedLatch.await(10_000, MILLISECONDS));
+
+        waitSegmented(client);
+
+        assertFalse(err.get());
+
+        if (!failSrv)
+            await(srvFailedLatch);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectClusterRestart() throws Exception {
+        netTimeout = 3000;
+        joinTimeout = 60_000;
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+        clientIpFinder.setAddresses(Collections.singleton("localhost:47500..47509"));
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+        final AtomicBoolean err = new AtomicBoolean(false);
+
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+        Ignite client = G.ignite("client-0");
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override
+            public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected event.");
+
+                    assertEquals(1, reconnectLatch.getCount());
+                    assertEquals(1, disconnectLatch.getCount());
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    log.info("Reconnected event.");
+
+                    assertEquals(1, reconnectLatch.getCount());
+                    assertEquals(0, disconnectLatch.getCount());
+                    assertFalse(err.get());
+
+                    reconnectLatch.countDown();
+                } else {
+                    log.error("Unexpected event: " + evt);
+
+                    err.set(true);
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+        log.info("Stop server.");
+
+        srv.close();
+
+        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+        srvNodeIds.clear();
+        srvIdx.set(0);
+
+        Thread.sleep(3000);
+
+        log.info("Restart server.");
+
+        startServerNodes(1);
+
+        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+        assertFalse(err.get());
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testDisconnectAfterNetworkTimeout() throws Exception {
-        // TODO IGNTIE-901.
     }
 
     /**
+     * @param ignite Ignite.
      * @throws Exception If failed.
      */
-    public void testReconnectSegmentedAfterJoinTimeout() throws Exception {
-        // TODO IGNTIE-901.
+    private void waitSegmented(final Ignite ignite) throws Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return IgniteState.STOPPED_ON_SEGMENTATION == Ignition.state(ignite.name());
+            }
+        }, 5000);
+
+        assertEquals(IgniteState.STOPPED_ON_SEGMENTATION, Ignition.state(ignite.name()));
     }
 
     /**
@@ -1567,7 +1783,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @throws InterruptedException If interrupted.
      */
     private void await(CountDownLatch latch) throws InterruptedException {
-        assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+        assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 05677a4..2b2996d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -26,7 +26,7 @@ import org.h2.table.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
-import javax.cache.CacheException;
+import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -93,6 +93,9 @@ public abstract class GridMergeIndex extends BaseIndex {
             throw new IllegalStateException();
     }
 
+    /**
+     * @param e Error.
+     */
     public void fail(final CacheException e) {
         for (UUID nodeId0 : remainingRows.keySet()) {
             addPage0(new GridResultPage(null, nodeId0, null) {
@@ -100,8 +103,7 @@ public abstract class GridMergeIndex extends BaseIndex {
                     return true;
                 }
 
-                @Override
-                public void fetchNextPage() {
+                @Override public void fetchNextPage() {
                     throw e;
                 }
             });
@@ -111,23 +113,12 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * @param nodeId Node ID.
      */
-    public void fail(@Nullable UUID nodeId) {
-        if (nodeId != null) {
-            addPage0(new GridResultPage(null, nodeId, null) {
-                @Override public boolean isFail() {
-                    return true;
-                }
-            });
-        }
-        else {
-            for (UUID nodeId0 : remainingRows.keySet()) {
-                addPage0(new GridResultPage(null, nodeId0, null) {
-                    @Override public boolean isFail() {
-                        return true;
-                    }
-                });
+    public void fail(UUID nodeId) {
+        addPage0(new GridResultPage(null, nodeId, null) {
+            @Override public boolean isFail() {
+                return true;
             }
-        }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fa007b1f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8f03681..cde3288 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -1162,6 +1162,9 @@ public class GridReduceQueryExecutor {
                 tbl.getScanIndex(null).fail(nodeId);
         }
 
+        /**
+         * @param e Error.
+         */
         void disconnected(CacheException e) {
             if (!state.compareAndSet(null, e))
                 return;