You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/12 17:04:46 UTC

[48/50] incubator-ignite git commit: # ignite-883

# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8870a177
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8870a177
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8870a177

Branch: refs/heads/ignite-883-1
Commit: 8870a177acaa16f6757615cab6017b21aa274c01
Parents: 16f3d32
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 12 10:05:22 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 12 17:20:14 2015 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  |   2 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 232 ++++++++++++-------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 170 +++++++++++---
 .../ipfinder/TcpDiscoveryIpFinderAdapter.java   |  34 ++-
 .../TcpDiscoveryMulticastIpFinder.java          |  19 +-
 .../messages/TcpDiscoveryAbstractMessage.java   |  10 +-
 .../distributed/IgniteCacheManyClientsTest.java |  65 ++++--
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  42 +++-
 8 files changed, 412 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index f19e25b..9e38788 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2028,7 +2028,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     if (X.hasCause(e, SocketTimeoutException.class))
                         LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
-                            "configuration property) [addr=" + addr + ", connTimeout" + connTimeout + ']');
+                            "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
 
                     if (errs == null)
                         errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 23e6f88..d8108e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -311,7 +311,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             for (ClusterNode n : rmts) {
                 rmtNodes.remove(n.id());
 
-                Collection<ClusterNode> top = updateTopologyHistory(topVer + 1);
+                Collection<ClusterNode> top = updateTopologyHistory(topVer + 1, null);
 
                 lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
             }
@@ -348,13 +348,14 @@ class ClientImpl extends TcpDiscoveryImpl {
 
     /**
      * @param recon {@code True} if reconnects.
+     * @param timeout Timeout.
      * @return Opened socket or {@code null} if timeout.
      * @throws InterruptedException If interrupted.
      * @throws IgniteSpiException If failed.
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
-    @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
+    @Nullable private Socket joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
         Collection<InetSocketAddress> addrs = null;
 
         long startTime = U.currentTimeMillis();
@@ -371,11 +372,11 @@ class ClientImpl extends TcpDiscoveryImpl {
                         log.debug("Resolved addresses from IP finder: " + addrs);
                 }
                 else {
-                    U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder);
-
-                    if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
+                    if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
                         return null;
 
+                    U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder);
+
                     Thread.sleep(2000);
                 }
             }
@@ -421,13 +422,13 @@ class ClientImpl extends TcpDiscoveryImpl {
             }
 
             if (addrs.isEmpty()) {
-                if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
+                if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
                     return null;
 
-                Thread.sleep(2000);
-
                 U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
                     "in 2000ms): " + addrs0);
+
+                Thread.sleep(2000);
             }
         }
     }
@@ -541,16 +542,17 @@ class ClientImpl extends TcpDiscoveryImpl {
 
     /**
      * @param topVer New topology version.
+     * @param msg Discovery message.
      * @return Latest topology snapshot.
      */
-    private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) {
+    private NavigableSet<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) {
         this.topVer = topVer;
 
         NavigableSet<ClusterNode> allNodes = allVisibleNodes();
 
         if (!topHist.containsKey(topVer)) {
             assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
-                "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+                "lastVer=" + topHist.lastKey() + ", newVer=" + topVer + ", locNode=" + locNode + ", msg=" + msg;
 
             topHist.put(topVer, allNodes);
 
@@ -619,6 +621,17 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * @param err Error.
+     */
+    private void joinError(IgniteSpiException err) {
+        assert err != null;
+
+        joinErr = err;
+
+        joinLatch.countDown();
+    }
+
+    /**
      * Heartbeat sender.
      */
     private class HeartbeatSender extends TimerTask {
@@ -727,7 +740,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         spi.stats.onMessageReceived(msg);
 
-                        if (spi.ensured(msg))
+                        if (spi.ensured(msg) && joinLatch.getCount() == 0L)
                             lastMsgId = msg.id();
 
                         msgWorker.addMessage(msg);
@@ -866,11 +879,16 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private volatile Socket sock;
 
+        /** */
+        private boolean join;
+
         /**
-         *
+         * @param join {@code True} if reconnects during join.
          */
-        protected Reconnector() {
+        protected Reconnector(boolean join) {
             super(spi.ignite().name(), "tcp-client-disco-msg-worker", log);
+
+            this.join = join;
         }
 
         /**
@@ -888,51 +906,94 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             boolean success = false;
 
+            Exception err = null;
+
+            long timeout = join ? spi.joinTimeout : spi.netTimeout;
+
+            long startTime = U.currentTimeMillis();
+
             try {
-                sock = joinTopology(true);
+                while (true) {
+                    sock = joinTopology(true, timeout);
 
-                if (sock == null) {
-                    U.error(log, "Failed to reconnect to cluster: timeout.");
+                    if (sock == null) {
+                        if (join) {
+                            joinError(new IgniteSpiException("Join process timed out, connection failed and " +
+                                "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
+                                "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                        }
+                        else
+                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
+                                "configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
 
-                    return;
-                }
+                        return;
+                    }
 
-                if (isInterrupted())
-                    throw new InterruptedException();
+                    if (isInterrupted())
+                        throw new InterruptedException();
 
-                InputStream in = new BufferedInputStream(sock.getInputStream());
+                    int oldTimeout = 0;
 
-                sock.setKeepAlive(true);
-                sock.setTcpNoDelay(true);
+                    try {
+                        oldTimeout = sock.getSoTimeout();
 
-                // Wait for
-                while (!isInterrupted()) {
-                    TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+                        sock.setSoTimeout((int)spi.netTimeout);
 
-                    if (msg instanceof TcpDiscoveryClientReconnectMessage) {
-                        TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
+                        InputStream in = new BufferedInputStream(sock.getInputStream());
 
-                        if (res.creatorNodeId().equals(getLocalNodeId())) {
-                            if (res.success()) {
-                                msgWorker.addMessage(res);
+                        sock.setKeepAlive(true);
+                        sock.setTcpNoDelay(true);
 
-                                success = true;
-                            }
+                        // Wait for
+                        while (!isInterrupted()) {
+                            TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
 
-                            break;
+                            if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+                                TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
+
+                                if (res.creatorNodeId().equals(getLocalNodeId())) {
+                                    if (res.success()) {
+                                        msgWorker.addMessage(res);
+
+                                        success = true;
+                                    }
+
+                                    return;
+                                }
+                            }
                         }
                     }
+                    catch (IOException | IgniteCheckedException e) {
+                        U.closeQuiet(sock);
+
+                        if (log.isDebugEnabled())
+                            log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e);
 
+                        if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
+                            throw e;
+                        else
+                            U.warn(log, "Failed to reconnect to cluster (will retry): " + e);
+                    }
+                    finally {
+                        if (success)
+                            sock.setSoTimeout(oldTimeout);
+                    }
                 }
             }
             catch (IOException | IgniteCheckedException e) {
+                err = e;
+
                 U.error(log, "Failed to reconnect", e);
             }
             finally {
                 if (!success) {
                     U.closeQuiet(sock);
 
-                    msgWorker.addMessage(SPI_RECONNECT_FAILED);
+                    if (join)
+                        joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " +
+                            "to reconnect.", err));
+                    else
+                        msgWorker.addMessage(SPI_RECONNECT_FAILED);
                 }
             }
         }
@@ -967,7 +1028,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             spi.stats.onJoinStarted();
 
             try {
-                final Socket sock = joinTopology(false);
+                final Socket sock = joinTopology(false, spi.joinTimeout);
 
                 if (sock == null) {
                     joinErr = new IgniteSpiException("Join process timed out.");
@@ -981,12 +1042,14 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 sockWriter.setSocket(sock);
 
-                timer.schedule(new TimerTask() {
-                    @Override public void run() {
-                        if (joinLatch.getCount() > 0)
-                            queue.add(JOIN_TIMEOUT);
-                    }
-                }, spi.netTimeout);
+                if (spi.joinTimeout > 0) {
+                    timer.schedule(new TimerTask() {
+                        @Override public void run() {
+                            if (joinLatch.getCount() > 0)
+                                queue.add(JOIN_TIMEOUT);
+                        }
+                    }, spi.joinTimeout);
+                }
 
                 sockReader.setSocket(sock, locNode.clientRouterNodeId());
 
@@ -996,8 +1059,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     if (msg == JOIN_TIMEOUT) {
                         if (joinLatch.getCount() > 0) {
                             joinErr = new IgniteSpiException("Join process timed out, did not receive response for " +
-                                "join request (consider increasing 'networkTimeout' configuration property) " +
-                                "[networkTimeout=" + spi.netTimeout + ", sock=" + sock +']');
+                                "join request (consider increasing 'joinTimeout' configuration property) " +
+                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock +']');
 
                             joinLatch.countDown();
 
@@ -1021,30 +1084,23 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (((SocketClosedMessage)msg).sock == currSock) {
                             currSock = null;
 
-                            if (joinLatch.getCount() > 0) {
-                                joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed.");
+                            boolean join = joinLatch.getCount() > 0;
 
-                                joinLatch.countDown();
+                            if (spi.getSpiContext().isStopping() || segmented) {
+                                leaveLatch.countDown();
 
-                                break;
+                                if (join) {
+                                    joinError(new IgniteSpiException("Failed to connect to cluster: socket closed."));
+
+                                    break;
+                                }
                             }
                             else {
-                                if (spi.getSpiContext().isStopping() || segmented)
-                                    leaveLatch.countDown();
-                                else {
-                                    assert reconnector == null;
-
-                                    final Reconnector reconnector = new Reconnector();
-                                    this.reconnector = reconnector;
-                                    reconnector.start();
-
-                                    timer.schedule(new TimerTask() {
-                                        @Override public void run() {
-                                            if (reconnector.isAlive())
-                                                reconnector.cancel();
-                                        }
-                                    }, spi.netTimeout);
-                                }
+                                assert reconnector == null;
+
+                                final Reconnector reconnector = new Reconnector(join);
+                                this.reconnector = reconnector;
+                                reconnector.start();
                             }
                         }
                     }
@@ -1208,7 +1264,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     locNode.order(topVer);
 
-                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
+                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg));
 
                     joinErr = null;
 
@@ -1230,6 +1286,14 @@ class ClientImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                if (!topHist.isEmpty() && msg.topologyVersion() <= topHist.lastKey()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node add finished message since topology already updated " +
+                            "[msg=" + msg + ", lastHistKey=" + topHist.lastKey() + ", node=" + node + ']');
+
+                    return;
+                }
+
                 long topVer = msg.topologyVersion();
 
                 node.order(topVer);
@@ -1238,7 +1302,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 if (spi.locNodeVer.equals(node.version()))
                     node.version(spi.locNodeVer);
 
-                NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
+                NavigableSet<ClusterNode> top = updateTopologyHistory(topVer, msg);
 
                 if (!pending && joinLatch.getCount() > 0) {
                     if (log.isDebugEnabled())
@@ -1276,7 +1340,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+                NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
 
                 if (!pending && joinLatch.getCount() > 0) {
                     if (log.isDebugEnabled())
@@ -1319,7 +1383,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+                NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
 
                 if (!pending && joinLatch.getCount() > 0) {
                     if (log.isDebugEnabled())
@@ -1376,28 +1440,32 @@ class ClientImpl extends TcpDiscoveryImpl {
                 return;
 
             if (getLocalNodeId().equals(msg.creatorNodeId())) {
-                assert msg.success();
+                assert msg.success() : msg;
 
-                currSock = reconnector.sock;
+                if (reconnector != null) {
+                    currSock = reconnector.sock;
 
-                sockWriter.setSocket(currSock);
-                sockReader.setSocket(currSock, locNode.clientRouterNodeId());
+                    sockWriter.setSocket(currSock);
+                    sockReader.setSocket(currSock, locNode.clientRouterNodeId());
 
-                reconnector = null;
+                    reconnector = null;
 
-                pending = true;
+                    pending = true;
 
-                try {
-                    for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Process message on reconnect [msg=" + pendingMsg + ']');
+                    try {
+                        for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Process message on reconnect [msg=" + pendingMsg + ']');
 
-                        processDiscoveryMessage(pendingMsg);
+                            processDiscoveryMessage(pendingMsg);
+                        }
+                    }
+                    finally {
+                        pending = false;
                     }
                 }
-                finally {
-                    pending = false;
-                }
+                else if (log.isDebugEnabled())
+                    log.debug("Discarding reconnect message, reconnect is completed: " + msg);
             }
             else if (log.isDebugEnabled())
                 log.debug("Discarding reconnect message for another client: " + msg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 65bea9f..9041557 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -41,7 +41,6 @@ import org.jsr166.*;
 
 import java.io.*;
 import java.net.*;
-import java.text.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -1192,7 +1191,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (node.id().equals(destNodeId)) {
                 Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
-                Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size());
+                Collection<TcpDiscoveryNode> topToSnd = new ArrayList<>(allNodes.size());
 
                 for (TcpDiscoveryNode n0 : allNodes) {
                     assert n0.internalOrder() != 0 : n0;
@@ -1202,10 +1201,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // There will be separate messages for nodes with greater
                     // internal order.
                     if (n0.internalOrder() < nodeAddedMsg.node().internalOrder())
-                        topToSend.add(n0);
+                        topToSnd.add(n0);
                 }
 
-                nodeAddedMsg.topology(topToSend);
+                nodeAddedMsg.topology(topToSnd);
                 nodeAddedMsg.messages(msgs, discardMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
@@ -1646,6 +1645,108 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * Discovery messages history used for client reconnect.
+     */
+    private class EnsuredMessageHistory {
+        /** */
+        private static final int MAX = 1024;
+
+        /** Pending messages. */
+        private final ArrayDeque<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+
+        /**
+         * @param msg Adds message.
+         */
+        void add(TcpDiscoveryAbstractMessage msg) {
+            assert spi.ensured(msg) : msg;
+
+            msgs.addLast(msg);
+
+            while (msgs.size() > MAX)
+                msgs.pollFirst();
+        }
+
+        /**
+         * Gets messages starting from provided ID (exclusive). If such
+         * message is not found, {@code null} is returned (this indicates
+         * a failure condition when it was already removed from queue).
+         *
+         * @param lastMsgId Last message ID received on client. {@code Null} if client did not finish connect procedure.
+         * @param node Client node.
+         * @return Collection of messages.
+         */
+        @Nullable Collection<TcpDiscoveryAbstractMessage> messages(@Nullable IgniteUuid lastMsgId,
+            TcpDiscoveryNode node)
+        {
+            assert node != null && node.isClient() : node;
+
+            if (lastMsgId == null) {
+                // Client connection failed before it received TcpDiscoveryNodeAddedMessage.
+                List<TcpDiscoveryAbstractMessage> res = null;
+
+                for (TcpDiscoveryAbstractMessage msg : msgs) {
+                    if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                        if (node.id().equals(((TcpDiscoveryNodeAddedMessage) msg).node().id()))
+                            res = new ArrayList<>(msgs.size());
+                    }
+
+                    if (res != null)
+                        res.add(prepare(msg, node.id()));
+                }
+
+                if (log.isDebugEnabled()) {
+                    if (res == null)
+                        log.debug("Failed to find node added message [node=" + node + ']');
+                    else
+                        log.debug("Found add added message [node=" + node + ", hist=" + res + ']');
+                }
+
+                return res;
+            }
+            else {
+                if (msgs.isEmpty())
+                    return Collections.emptyList();
+
+                Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
+
+                boolean skip = true;
+
+                for (TcpDiscoveryAbstractMessage msg : msgs) {
+                    if (skip) {
+                        if (msg.id().equals(lastMsgId))
+                            skip = false;
+                    }
+                    else
+                        cp.add(prepare(msg, node.id()));
+                }
+
+                cp = !skip ? cp : null;
+
+                if (log.isDebugEnabled()) {
+                    if (cp == null)
+                        log.debug("Failed to find messages history [node=" + node + ", lastMsgId" + lastMsgId + ']');
+                    else
+                        log.debug("Found messages history [node=" + node + ", hist=" + cp + ']');
+                }
+
+                return cp;
+            }
+        }
+
+        /**
+         * @param msg Message.
+         * @param destNodeId Client node ID.
+         * @return Prepared message.
+         */
+        private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
+            if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                prepareNodeAddedMessage(msg, destNodeId, null, null);
+
+            return msg;
+        }
+    }
+
+    /**
      * Pending messages container.
      */
     private static class PendingMessages {
@@ -1678,33 +1779,27 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Gets messages starting from provided ID (exclusive). If such
-         * message is not found, {@code null} is returned (this indicates
-         * a failure condition when it was already removed from queue).
+         * Resets pending messages.
          *
-         * @param lastMsgId Last message ID.
-         * @return Collection of messages.
+         * @param msgs Message.
+         * @param discardId Discarded message ID.
          */
-        @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) {
-            assert lastMsgId != null;
-
-            if (msgs.isEmpty())
-                return Collections.emptyList();
+        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+            this.msgs.clear();
 
-            Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
+            if (msgs != null)
+                this.msgs.addAll(msgs);
 
-            boolean skip = true;
+            this.discardId = discardId;
+        }
 
-            for (TcpDiscoveryAbstractMessage msg : msgs) {
-                if (skip) {
-                    if (msg.id().equals(lastMsgId))
-                        skip = false;
-                }
-                else
-                    cp.add(msg);
-            }
+        /**
+         * Clears pending messages.
+         */
+        void clear() {
+            msgs.clear();
 
-            return !skip ? cp : null;
+            discardId = null;
         }
 
         /**
@@ -1728,6 +1823,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Pending messages. */
         private final PendingMessages pendingMsgs = new PendingMessages();
 
+        /** Messages history used for client reconnect. */
+        private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
+
         /** Last message that updated topology. */
         private TcpDiscoveryAbstractMessage lastMsg;
 
@@ -1794,6 +1892,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
+            if (spi.ensured(msg))
+                msgHist.add(msg);
+
             spi.stats.onMessageProcessingFinished(msg);
         }
 
@@ -2130,6 +2231,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
                                 e);
 
+                            log.error("Will resend [msg=" + msg + ", e=" + e + ']');
+
                             if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
                                 ackTimeout0 *= 2;
 
@@ -2619,11 +2722,15 @@ class ServerImpl extends TcpDiscoveryImpl {
                 node.aliveCheck(spi.maxMissedClientHbs);
 
                 if (isLocalNodeCoordinator()) {
-                    Collection<TcpDiscoveryAbstractMessage> pending = pendingMsgs.messages(msg.lastMessageId());
+                    Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
 
                     if (pending != null) {
                         msg.pendingMessages(pending);
                         msg.success(true);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Accept client reconnect, restored pending messages " +
+                                "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
                     }
                     else {
                         if (log.isDebugEnabled())
@@ -2836,7 +2943,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.clear();
                             topHist.putAll(msg.topologyHistory());
 
-                            pendingMsgs.discard(msg.discardedMessageId());
+                            // Restore pending messages.
+                            pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
 
                             // Clear data to minimize message size.
                             msg.messages(null, null);
@@ -3094,6 +3202,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (log.isDebugEnabled())
                     log.debug("Removed node from topology: " + leftNode);
 
+                // Clear pending messages map.
+                if (!ring.hasRemoteNodes())
+                    pendingMsgs.clear();
+
                 long topVer;
 
                 if (locNodeCoord) {
@@ -3257,6 +3369,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 assert node != null;
 
+                // Clear pending messages map.
+                if (!ring.hasRemoteNodes())
+                    pendingMsgs.clear();
+
                 long topVer;
 
                 if (locNodeCoord) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
index 99a2cdc..4d62ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
@@ -17,9 +17,13 @@
 
 package org.apache.ignite.spi.discovery.tcp.ipfinder;
 
+import org.apache.ignite.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.*;
 
 import java.net.*;
 import java.util.*;
@@ -35,6 +39,11 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde
     @GridToStringExclude
     private volatile IgniteSpiContext spiCtx;
 
+    /** Ignite instance . */
+    @IgniteInstanceResource
+    @GridToStringExclude
+    protected Ignite ignite;
+
     /** {@inheritDoc} */
     @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException {
         this.spiCtx = spiCtx;
@@ -47,7 +56,8 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde
 
     /** {@inheritDoc} */
     @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
-        registerAddresses(addrs);
+        if (!discoveryClientMode())
+            registerAddresses(addrs);
     }
 
     /** {@inheritDoc} */
@@ -77,6 +87,28 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde
     }
 
     /**
+     * @return {@code True} if TCP discovery works in client mode.
+     */
+    protected boolean discoveryClientMode() {
+        boolean clientMode;
+
+        Ignite ignite0 = ignite;
+
+        if (ignite0 != null) { // Can be null if used in tests without starting Ignite.
+            DiscoverySpi discoSpi = ignite0.configuration().getDiscoverySpi();
+
+            if (!(discoSpi instanceof TcpDiscoverySpi))
+                throw new IgniteSpiException("TcpDiscoveryIpFinder should be used with TcpDiscoverySpi: " + discoSpi);
+
+            clientMode = ignite0.configuration().isClientMode() && !((TcpDiscoverySpi)discoSpi).isForceServerMode();
+        }
+        else
+            clientMode = false;
+
+        return clientMode;
+    }
+
+    /**
      * @return SPI context.
      */
     protected IgniteSpiContext spiContext() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index a992620..8e5a1fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -85,11 +85,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
     @LoggerResource
     private IgniteLogger log;
 
-    /** Ignite instance . */
-    @IgniteInstanceResource
-    @GridToStringExclude
-    private Ignite ignite;
-
     /** Multicast IP address as string. */
     private String mcastGrp = DFLT_MCAST_GROUP;
 
@@ -256,19 +251,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
                 "(it is recommended in production to specify at least one address in " +
                 "TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)");
 
-        boolean clientMode;
-
-        if (ignite != null) { // Can be null if used in tests without starting Ignite.
-            DiscoverySpi discoSpi = ignite.configuration().getDiscoverySpi();
-
-            if (!(discoSpi instanceof TcpDiscoverySpi))
-                throw new IgniteSpiException("TcpDiscoveryMulticastIpFinder should be used with " +
-                    "TcpDiscoverySpi: " + discoSpi);
-
-            clientMode = ((TcpDiscoverySpi)discoSpi).isClientMode();
-        }
-        else
-            clientMode = false;
+        boolean clientMode = discoveryClientMode();
 
         InetAddress mcastAddr;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 145b518..21dbf4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -41,7 +41,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
     protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
 
     /** Sender of the message (transient). */
-    private transient UUID senderNodeId;
+    private transient UUID sndNodeId;
 
     /** Message ID. */
     private IgniteUuid id;
@@ -99,16 +99,16 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
      * @return Sender node ID.
      */
     public UUID senderNodeId() {
-        return senderNodeId;
+        return sndNodeId;
     }
 
     /**
      * Sets sender node ID.
      *
-     * @param senderNodeId Sender node ID.
+     * @param sndNodeId Sender node ID.
      */
-    public void senderNodeId(UUID senderNodeId) {
-        this.senderNodeId = senderNodeId;
+    public void senderNodeId(UUID sndNodeId) {
+        this.sndNodeId = sndNodeId;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 77ddd40..4fb4387 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -63,18 +63,13 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(2 * 60_000);
 
         if (!clientDiscovery)
             ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
 
         cfg.setClientMode(client);
 
-        if (client) {
-//            cfg.setPublicThreadPoolSize(1);
-//            cfg.setPeerClassLoadingThreadPoolSize(1);
-//            cfg.setIgfsThreadPoolSize(1);
-        }
-
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setCacheMode(PARTITIONED);
@@ -197,43 +192,62 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
         try {
             IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    try (Ignite ignite = startGrid(idx.getAndIncrement())) {
-                        log.info("Started node: " + ignite.name());
+                    boolean counted = false;
 
-                        assertTrue(ignite.configuration().isClientMode());
+                    try {
+                        int nodeIdx = idx.getAndIncrement();
 
-                        IgniteCache<Object, Object> cache = ignite.cache(null);
+                        Thread.currentThread().setName("client-thread-node-" + nodeIdx);
 
-                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+                        try (Ignite ignite = startGrid(nodeIdx)) {
+                            log.info("Started node: " + ignite.name());
 
-                        int iter = 0;
+                            assertTrue(ignite.configuration().isClientMode());
 
-                        Integer key = rnd.nextInt(0, 1000);
+                            IgniteCache<Object, Object> cache = ignite.cache(null);
 
-                        cache.put(key, iter++);
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                        assertNotNull(cache.get(key));
+                            int iter = 0;
 
-                        latch.countDown();
-
-                        while (!stop.get()) {
-                            key = rnd.nextInt(0, 1000);
+                            Integer key = rnd.nextInt(0, 1000);
 
                             cache.put(key, iter++);
 
                             assertNotNull(cache.get(key));
 
-                            Thread.sleep(1);
+                            latch.countDown();
+
+                            counted = true;
+
+                            while (!stop.get()) {
+                                key = rnd.nextInt(0, 1000);
+
+                                cache.put(key, iter++);
+
+                                assertNotNull(cache.get(key));
+
+                                Thread.sleep(1);
+                            }
+
+                            log.info("Stopping node: " + ignite.name());
                         }
 
-                        log.info("Stopping node: " + ignite.name());
+                        return null;
                     }
+                    catch (Throwable e) {
+                        log.error("Unexpected error in client thread: " + e, e);
 
-                    return null;
+                        throw e;
+                    }
+                    finally {
+                        if (!counted)
+                            latch.countDown();
+                    }
                 }
             }, THREADS, "client-thread");
 
-            latch.await();
+            assertTrue(latch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
 
             log.info("All clients started.");
 
@@ -245,6 +259,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
 
             fut.get();
         }
+        catch (Throwable e) {
+            log.error("Unexpected error: " + e, e);
+
+            throw e;
+        }
         finally {
             stop.set(true);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 44fe299..8147958 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -193,8 +193,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testJoinTimeout() throws Exception {
         clientIpFinder = new TcpDiscoveryVmIpFinder();
@@ -544,8 +543,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testClientReconnectTopologyChange2() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-998");
-
         maxMissedClientHbs = 100;
 
         clientsPerSrv = 1;
@@ -1001,6 +998,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testJoinError2() throws Exception {
+        startServerNodes(1);
+
+        Ignite ignite = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+
+        srvSpi.failNodeAddedMessage();
+        srvSpi.failClientReconnectMessage();
+
+        startClientNodes(1);
+
+        checkNodes(1, 1);
+    }
+
+    /**
      * @param clientIdx Client index.
      * @param srvIdx Server index.
      * @throws Exception In case of error.
@@ -1251,6 +1266,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         /** */
         private AtomicInteger failNodeAdded = new AtomicInteger();
 
+        /** */
+        private AtomicInteger failClientReconnect = new AtomicInteger();
+
         /**
          * @param lock Lock.
          */
@@ -1276,6 +1294,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         }
 
         /**
+         *
+         */
+        void failClientReconnectMessage() {
+            failClientReconnect.set(1);
+        }
+
+        /**
          * @param isPause Is lock.
          * @param locks Locks.
          */
@@ -1293,7 +1318,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
-            if (msg instanceof TcpDiscoveryNodeAddedMessage && failNodeAdded.getAndDecrement() > 0) {
+            boolean fail = false;
+
+            if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                fail = failNodeAdded.getAndDecrement() > 0;
+            else if (msg instanceof TcpDiscoveryClientReconnectMessage)
+                fail = failClientReconnect.getAndDecrement() > 0;
+
+            if (fail) {
                 log.info("Close socket on message write [msg=" + msg + "]");
 
                 sock.close();