You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 14:38:22 UTC

[8/9] ignite git commit: ignite-4003 Async outgoing connections for communication SPI

http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 42879b7..9622c84 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -35,6 +35,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -46,8 +47,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
@@ -66,7 +65,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.ipc.IpcEndpoint;
 import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
@@ -78,6 +77,7 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
 import org.apache.ignite.internal.util.nio.GridDirectParser;
 import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
 import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
 import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
@@ -90,9 +90,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
-import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
-import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -106,7 +104,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -136,7 +133,6 @@ import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
 
 /**
  * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -297,10 +293,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** Connection index meta for session. */
     private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
+    /** Recovery descriptor meta key. */
+    private static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Connection context meta key. */
+    private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
     /**
      * Default local port range (value is <tt>100</tt>).
      * See {@link #setLocalPortRange(int)} for details.
@@ -335,6 +336,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Handshake message type. */
     public static final short HANDSHAKE_MSG_TYPE = -3;
 
+    /** Ignite header message. */
+    private static final Message IGNITE_HEADER_MSG = new IgniteHeaderMessage();
+
+    /** Skip ack. For test purposes only. */
+    private boolean skipAck;
+
     /** */
     private ConnectGateway connectGate;
 
@@ -408,10 +415,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                         log.debug("Session was closed but there are unacknowledged messages, " +
                                             "will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
 
-                                    DisconnectedSessionInfo disconnectData =
-                                        new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
+                                    SessionInfo sesInfo =
+                                            new SessionInfo(ses, connId.connectionIndex(), SessionState.RECONNECT);
 
-                                    commWorker.addProcessDisconnectRequest(disconnectData);
+                                    commWorker.addSessionStateChangeRequest(sesInfo);
                                 }
                             }
                             else
@@ -437,14 +444,69 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 if (msg instanceof NodeIdMessage) {
                     sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
-                    connKey = new ConnectionKey(sndId, 0, -1);
+
+                    if (ses.remoteAddress() != null) { // Not shmem.
+                        assert !ses.accepted();
+
+                        ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+                        assert ctx != null;
+                        assert ctx.expNodeId != null;
+
+                        if (sndId.equals(ctx.expNodeId)) {
+                            GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
+
+                            assert recoveryDesc != null;
+
+                            long connCnt = recoveryDesc.incrementConnectCount();
+
+                            connKey = new ConnectionKey(sndId, ctx.connIdx, connCnt);
+
+                            final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
+
+                            assert old == null;
+
+                            ses.send(IGNITE_HEADER_MSG);
+
+                            ClusterNode locNode = getLocalNode();
+
+                            if (locNode == null) {
+                                commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE,
+                                    new IgniteCheckedException("Local node has not been started or " +
+                                        "fully initialized [isStopping=" + getSpiContext().isStopping() + ']')));
+
+                                return;
+                            }
+
+                            int handshakeConnIdx = connPlc.connectionIndex();
+
+                            HandshakeMessage handshakeMsg = new HandshakeMessage2(locNode.id(), connCnt,
+                                recoveryDesc.received(), handshakeConnIdx);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Write handshake message [rmtNode=" + sndId +
+                                    ", msg=" + handshakeMsg + ']');
+
+                            ses.send(handshakeMsg);
+                        }
+                        else {
+                            commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE,
+                                new IgniteCheckedException("Remote node ID is not as expected [expected=" +
+                                    ctx.expNodeId + ", rcvd=" + sndId + ']')));
+                        }
+
+                        return;
+                    }
+                    else
+                        connKey = new ConnectionKey(sndId, 0, -1);
                 }
                 else {
                     assert msg instanceof HandshakeMessage : msg;
 
                     HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                    sndId = ((HandshakeMessage)msg).nodeId();
+                    sndId = msg0.nodeId();
+
                     connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
                 }
 
@@ -485,30 +547,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (reserve)
                         connectedNew(recoveryDesc, ses, true);
                     else {
-                        if (c.failed) {
-                            ses.send(new RecoveryLastReceivedMessage(-1));
-
-                            for (GridNioSession ses0 : nioSrvr.sessions()) {
-                                ConnectionKey key0 = ses0.meta(CONN_IDX_META);
-
-                                if (ses0.accepted() && key0 != null &&
-                                    key0.nodeId().equals(connKey.nodeId()) &&
-                                    key0.connectionIndex() == connKey.connectionIndex() &&
-                                    key0.connectCount() < connKey.connectCount())
-                                    ses0.close();
-                            }
+                        for (GridNioSession ses0 : nioSrvr.sessions()) {
+                            ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+                            if (ses0.accepted() && key0 != null &&
+                                key0.nodeId().equals(connKey.nodeId()) &&
+                                key0.connectionIndex() == connKey.connectionIndex() &&
+                                key0.connectCount() < connKey.connectCount())
+                                ses0.close();
                         }
                     }
                 }
                 else {
                     assert connKey.connectionIndex() >= 0 : connKey;
 
-                    GridCommunicationClient[] curClients = clients.get(sndId);
-
-                    GridCommunicationClient oldClient =
-                        curClients != null && connKey.connectionIndex() < curClients.length ?
-                            curClients[connKey.connectionIndex()] :
-                            null;
+                    GridCommunicationClient oldClient = nodeClient(sndId, connKey.connectionIndex());
 
                     boolean hasShmemClient = false;
 
@@ -537,10 +590,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
 
                     if (oldFut == null) {
-                        curClients = clients.get(sndId);
-
-                        oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
-                            curClients[connKey.connectionIndex()] : null;
+                        oldClient = nodeClient(sndId, connKey.connectionIndex());
 
                         if (oldClient != null) {
                             if (oldClient instanceof GridTcpNioCommunicationClient) {
@@ -584,7 +634,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                     }
                     else {
-                        if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+                        if (oldFut instanceof ReserveClientFuture && locNode.order() < rmtNode.order()) {
                             if (log.isDebugEnabled()) {
                                 log.debug("Received incoming connection from remote node while " +
                                     "connecting to this node, rejecting [locNode=" + locNode.id() +
@@ -606,17 +656,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 }
             }
 
-            @Override public void onMessage(GridNioSession ses, Message msg) {
+            @Override public void onMessage(final GridNioSession ses, Message msg) {
                 ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
                 if (connKey == null) {
-                    assert ses.accepted() : ses;
-
-                    if (!connectGate.tryEnter()) {
+                    if (ses.accepted() && !connectGate.tryEnter()) { // Outgoing connection already entered gate.
                         if (log.isDebugEnabled())
                             log.debug("Close incoming connection, failed to enter gateway.");
 
-                        ses.close();
+                        try {
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    try {
+                                        fut.get();
+
+                                        ses.close();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        U.error(log, "Failed to send last received message: " + e, e);
+                                    }
+                                }
+                            });
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to send message: " + e, e);
+                        }
 
                         return;
                     }
@@ -625,7 +689,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         onFirstMessage(ses, msg);
                     }
                     finally {
-                        connectGate.leave();
+                        if (ses.accepted())
+                            connectGate.leave();
                     }
                 }
                 else {
@@ -637,13 +702,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         if (recovery != null) {
                             RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
 
+                            long rcvCnt = msg0.received();
+
                             if (log.isDebugEnabled()) {
                                 log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
                                     ", connIdx=" + connKey.connectionIndex() +
                                     ", rcvCnt=" + msg0.received() + ']');
                             }
 
-                            recovery.ackReceived(msg0.received());
+                            ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+                            if (!ses.accepted() && ctx != null && ctx.rcvCnt == Long.MIN_VALUE) {
+                                HandshakeTimeoutObject timeoutObj = ctx.handshakeTimeoutObj;
+
+                                Exception err = null;
+
+                                if (timeoutObj != null && !cancelHandshakeTimeout(timeoutObj)) {
+                                        err = new HandshakeTimeoutException("Failed to perform handshake due to timeout " +
+                                            "(consider increasing 'connectionTimeout' configuration property).");
+                                }
+
+                                if (rcvCnt == -1 || err != null) {
+                                    if (ses.remoteAddress() != null) {
+                                        SessionInfo sesInfo = new SessionInfo(ses, SessionState.CLOSE, err);
+
+                                        commWorker.addSessionStateChangeRequest(sesInfo);
+                                    }
+                                }
+                                else {
+                                    ctx.rcvCnt = rcvCnt;
+
+                                    recovery.onHandshake(rcvCnt);
+
+                                    nioSrvr.resend(ses);
+
+                                    recovery.onConnected();
+
+                                    SessionInfo sesInfo = new SessionInfo(ses, connKey.idx, SessionState.READY);
+
+                                    commWorker.addSessionStateChangeRequest(sesInfo);
+                                }
+                            }
+                            else
+                                recovery.ackReceived(rcvCnt);
 
                             return;
                         }
@@ -661,7 +762,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                         ", rcvCnt=" + rcvCnt + ']');
                                 }
 
-                                ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
+                                if (!skipAck)
+                                    ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
 
                                 recovery.lastAcknowledged(rcvCnt);
                             }
@@ -712,7 +814,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
                 assert !usePairedConnections(node);
 
-                recovery.onHandshake(rcvCnt);
+                if (ses.accepted())
+                    recovery.onHandshake(rcvCnt);
 
                 ses.inRecoveryDescriptor(recovery);
                 ses.outRecoveryDescriptor(recovery);
@@ -778,9 +881,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final ClusterNode rmtNode;
 
-                /** */
-                private boolean failed;
-
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
@@ -797,8 +897,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** {@inheritDoc} */
                 @Override public void apply(Boolean success) {
                     try {
-                        failed = !success;
-
                         if (success) {
                             IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
                                 @Override public void apply(IgniteInternalFuture<?> msgFut) {
@@ -919,10 +1017,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                     else {
                         try {
-                            fut.onDone();
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> msgFut) {
+                                    try {
+                                        msgFut.get();
+                                    } catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send recovery handshake " +
+                                                    "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+                                        recoveryDesc.release();
+                                    } finally {
+                                        fut.onDone();
+
+                                        clientFuts.remove(connKey, fut);
+
+                                        ses.close();
+                                    }
+                                }
+                            });
                         }
-                        finally {
-                            clientFuts.remove(connKey, fut);
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to send message: " + e, e);
                         }
                     }
                 }
@@ -1727,12 +1843,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             nioSrvr.dumpStats();
     }
 
-    /** */
-    private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
-
-    /** */
-    private final AtomicInteger connIdx = new AtomicInteger();
-
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
         initFailureDetectionTimeout();
@@ -1988,9 +2098,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
+                        UUID rmtNodeId = null;
+
                         ConnectionKey key = ses.meta(CONN_IDX_META);
 
-                        return key != null ? formatter.writer(key.nodeId()) : null;
+                        if (key != null)
+                            rmtNodeId = key.nodeId();
+                        else {
+                            ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+                            if (ctx != null)
+                                rmtNodeId = ctx.expNodeId;
+                        }
+
+                        return key != null ? formatter.writer(rmtNodeId) : null;
                     }
                 };
 
@@ -2000,7 +2121,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
                     @Override public boolean apply(Message msg) {
-                        return msg instanceof RecoveryLastReceivedMessage;
+                        return msg instanceof NotRecoverable;
                     }
                 };
 
@@ -2339,48 +2460,108 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             int connIdx = connPlc.connectionIndex();
 
+            send(node, connIdx, msg, ackC);
+        }
+    }
+
+    /**
+     * Try to send message.
+     */
+    private void send(final ClusterNode node,
+                      final int connIdx,
+                      final Message msg,
+                      final IgniteInClosure<IgniteException> ackC
+    ) {
+        final GridCommunicationClient client = nodeClient(node.id(), connIdx);
+
+        if (client != null && client.reserve()) {
             try {
-                boolean retry;
+                send0(client, node, msg, ackC);
+            }
+            catch (IgniteCheckedException e) {
+                if (removeNodeClient(node.id(), client))
+                    client.forceClose();
 
-                do {
-                    client = reserveClient(node, connIdx);
+                throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
+            }
+        }
+        else {
+            if (client != null)
+                removeNodeClient(node.id(), client);
 
-                    UUID nodeId = null;
+            IgniteInternalFuture<GridCommunicationClient> clientFut = reserveClient(node, connIdx);
 
-                    if (!client.async())
-                        nodeId = node.id();
+            clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+                @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+                    GridCommunicationClient client = null;
 
-                    retry = client.sendMessage(nodeId, msg, ackC);
+                    try {
+                        client = fut.get();
 
-                    client.release();
+                        send0(client, node, msg, ackC);
+                    }
+                    catch (IgniteCheckedException e) {
+                        LT.error(log, e, "Unexpected error occurred during sending of message to node: " + node.id());
 
-                    if (!retry)
-                        sentMsgsCnt.increment();
-                    else {
-                        removeNodeClient(node.id(), client);
+                        if (client != null && removeNodeClient(node.id(), client))
+                            client.forceClose();
+                    }
+                }
+            });
+        }
+    }
 
-                        ClusterNode node0 = getSpiContext().node(node.id());
+    /**
+     * @param client Client.
+     * @param node Node.
+     * @param msg Message.
+     * @param ackC Ack closure.
+     */
+    private void send0(
+        GridCommunicationClient client,
+        ClusterNode node,
+        Message msg,
+        IgniteInClosure<IgniteException> ackC
+    ) throws IgniteCheckedException {
+        assert client != null;
 
-                        if (node0 == null)
-                            throw new IgniteCheckedException("Failed to send message to remote node " +
-                                "(node has left the grid): " + node.id());
-                    }
+        UUID nodeId = null;
 
-                    client = null;
-                }
-                while (retry);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
-            }
-            finally {
-                if (client != null && removeNodeClient(node.id(), client))
-                    client.forceClose();
+        if (!client.async())
+            nodeId = node.id();
+
+        boolean retry = client.sendMessage(nodeId, msg, ackC);
+
+        client.release();
+
+        if (!retry)
+            sentMsgsCnt.increment();
+        else {
+            removeNodeClient(node.id(), client);
+
+            ClusterNode node0 = getSpiContext().node(node.id());
+
+            if (node0 == null) {
+                U.warn(log, "Failed to send message to remote node (node has left the grid): " + node.id());
+
+                return;
             }
+
+            send(node, client.connectionIndex(), msg, ackC);
         }
     }
 
     /**
+     * @param nodeId Node id.
+     * @param connIdx Connection index.
+     */
+    private GridCommunicationClient nodeClient(UUID nodeId, int connIdx) {
+        GridCommunicationClient[] curClients = clients.get(nodeId);
+
+        return curClients != null && connIdx < curClients.length ? curClients[connIdx] : null;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param rmvClient Client to remove.
      * @return {@code True} if client was removed.
@@ -2449,95 +2630,245 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @param node Node to which client should be open.
      * @param connIdx Connection index.
      * @return The existing or just created client.
-     * @throws IgniteCheckedException Thrown if any exception occurs.
      */
-    private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
-        assert node != null;
-        assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
+    private IgniteInternalFuture<GridCommunicationClient> reserveClient(ClusterNode node, int connIdx) {
+        GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
 
-        UUID nodeId = node.id();
+        tryReserveClient(node, connIdx, fut);
 
-        while (true) {
-            GridCommunicationClient[] curClients = clients.get(nodeId);
+        return fut;
+    }
+
+    /**
+     * @param node Node.
+     * @param connIdx Connection index.
+     * @param fut Future.
+     */
+    private void tryReserveClient(
+            final ClusterNode node,
+            final int connIdx,
+            final GridFutureAdapter<GridCommunicationClient> fut)
+    {
+        final ReserveClientFuture reserveFut = new ReserveClientFuture(node, connIdx);
+
+        reserveFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+            @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) {
+                try {
+                    GridCommunicationClient client = fut0.get();
+
+                    if (client != null)
+                        fut.onDone(client);
+                    else
+                        tryReserveClient(node, connIdx, fut);
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onDone(e);
+                }
+            }
+        });
+
+        try {
+            reserveFut.reserve();
+        }
+        catch (Exception e) {
+            fut.onDone(e);
+        }
+    }
+
+    /**
+     *
+     */
+    private class ReserveClientFuture extends GridFutureAdapter<GridCommunicationClient> {
+        /** Node. */
+        private final ClusterNode node;
+
+        /** Connection index. */
+        private final int connIdx;
+
+        /**
+         * @param node Node.
+         */
+        ReserveClientFuture(ClusterNode node, int connIdx) {
+            assert node != null;
+            assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
+
+            this.node = node;
+            this.connIdx = connIdx;
+        }
+
+        /**
+         *
+         */
+        void reserve() {
+            final UUID nodeId = node.id();
 
-            GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
-                curClients[connIdx] : null;
+            final GridCommunicationClient client = nodeClient(nodeId, connIdx);
+
+            final GridFutureAdapter<GridCommunicationClient> connFut;
 
             if (client == null) {
-                if (stopping)
-                    throw new IgniteSpiException("Node is stopping.");
+                if (stopping) {
+                    onDone(new IgniteSpiException("Node is stopping."));
+
+                    return;
+                }
 
                 // Do not allow concurrent connects.
-                GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
+                connFut = this;
 
-                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+                final ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
 
-                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
+                final GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, connFut);
 
                 if (oldFut == null) {
                     try {
-                        GridCommunicationClient[] curClients0 = clients.get(nodeId);
-
-                        GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
-                            curClients0[connIdx] : null;
+                        GridCommunicationClient client0 = nodeClient(nodeId, connIdx);
 
                         if (client0 == null) {
-                            client0 = createNioClient(node, connIdx);
-
-                            if (client0 != null) {
-                                addNodeClient(node, connIdx, client0);
+                            IgniteInternalFuture<GridCommunicationClient> clientFut = createNioClient(node, connIdx);
 
-                                if (client0 instanceof GridTcpNioCommunicationClient) {
-                                    GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
-
-                                    if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Session was closed after client creation, will retry " +
-                                                "[node=" + node + ", client=" + client0 + ']');
+                            clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+                                @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+                                    try {
+                                        GridCommunicationClient client0 = fut.get();
+
+                                        if (client0 != null) {
+                                            addNodeClient(node, connIdx, client0);
+
+                                            if (client0 instanceof GridTcpNioCommunicationClient) {
+                                                GridTcpNioCommunicationClient tcpClient =
+                                                    ((GridTcpNioCommunicationClient)client0);
+
+                                                if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Session was closed after client creation, " +
+                                                                "will retry [node=" + node + ", client=" + client0 + ']');
+
+                                                    client0 = null;
+                                                }
+                                            }
+
+                                            if (client0 == null) {
+                                                clientFuts.remove(connKey, connFut);
+
+                                                onDone();
+                                            }
+                                            else if (client0.reserve()) {
+                                                clientFuts.remove(connKey, connFut);
+
+                                                onDone(client0);
+                                            }
+                                            else {
+                                                clientFuts.remove(connKey, connFut);
+
+                                                removeNodeClient(nodeId, client0);
+
+                                                onDone();
+                                            }
+                                        }
+                                        else {
+                                            final long currTime = U.currentTimeMillis();
+
+                                            addTimeoutObject(new IgniteSpiTimeoutObject() {
+                                                private final IgniteUuid id = IgniteUuid.randomUuid();
+
+                                                @Override public IgniteUuid id() {
+                                                    return id;
+                                                }
+
+                                                @Override public long endTime() {
+                                                    return currTime + 200;
+                                                }
+
+                                                @Override public void onTimeout() {
+                                                    SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY,
+                                                        new Runnable() {
+                                                            @Override public void run() {
+                                                                clientFuts.remove(connKey, connFut);
+
+                                                                onDone();
+                                                            }
+                                                        });
+
+                                                    commWorker.addSessionStateChangeRequest(sesInfo);
+                                                }
+                                            });
+                                        }
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        clientFuts.remove(connKey, connFut);
 
-                                        client0 = null;
+                                        onDone(e);
                                     }
                                 }
-                            }
-                            else
-                                U.sleep(200);
+                            });
                         }
+                        else {
+                            assert connIdx == client0.connectionIndex() : client0;
 
-                        fut.onDone(client0);
+                            if (client0.reserve())
+                                onDone(client0);
+                            else {
+                                removeNodeClient(nodeId, client0);
+
+                                onDone();
+                            }
+                        }
                     }
                     catch (Throwable e) {
-                        fut.onDone(e);
+                        connFut.onDone(e);
 
                         if (e instanceof Error)
                             throw (Error)e;
                     }
-                    finally {
-                        clientFuts.remove(connKey, fut);
-                    }
                 }
-                else
-                    fut = oldFut;
+                else {
+                    oldFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+                        @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+                            try {
+                                GridCommunicationClient client0 = fut.get();
 
-                client = fut.get();
+                                if (client0 == null) {
+                                    clientFuts.remove(connKey, oldFut);
 
-                if (client == null)
-                    continue;
+                                    onDone();
+                                }
+                                else if (client0.reserve()) {
+                                    clientFuts.remove(connKey, oldFut);
 
-                if (getSpiContext().node(nodeId) == null) {
-                    if (removeNodeClient(nodeId, client))
-                        client.forceClose();
+                                    onDone(client0);
+                                }
+                                else {
+                                    clientFuts.remove(connKey, oldFut);
+
+                                    removeNodeClient(nodeId, client0);
 
-                    throw new IgniteSpiException("Destination node is not in topology: " + node.id());
+                                    onDone();
+                                }
+                            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                        }
+                    });
                 }
             }
+            else {
+                assert connIdx == client.connectionIndex() : client;
+
+                if (client.reserve())
+                    onDone(client);
+                else {
+                    removeNodeClient(nodeId, client);
 
-            assert connIdx == client.connectionIndex() : client;
+                    onDone();
+                }
+            }
+        }
 
-            if (client.reserve())
-                return client;
-            else
-                // Client has just been closed by idle worker. Help it and try again.
-                removeNodeClient(nodeId, client);
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReserveClientFuture.class, this);
         }
     }
 
@@ -2545,10 +2876,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @param node Node to create client for.
      * @param connIdx Connection index.
      * @return Client.
-     * @throws IgniteCheckedException If failed.
      */
-    @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
-        throws IgniteCheckedException {
+    protected IgniteInternalFuture<GridCommunicationClient> createNioClient(ClusterNode node, int connIdx) {
         assert node != null;
 
         Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2556,7 +2885,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         ClusterNode locNode = getSpiContext().localNode();
 
         if (locNode == null)
-            throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
+            return new GridFinishedFuture<>(
+                new IgniteCheckedException("Failed to create NIO client (local node is stopping)")
+            );
 
         if (log.isDebugEnabled())
             log.debug("Creating NIO client to node: " + node);
@@ -2573,7 +2904,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (log.isDebugEnabled())
                     log.debug("Shmem client created: " + client);
 
-                return client;
+                return new GridFinishedFuture<>(client);
             }
             catch (IgniteCheckedException e) {
                 if (e.hasCause(IpcOutOfSystemResourcesException.class))
@@ -2584,21 +2915,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 else if (log.isDebugEnabled())
                     log.debug("Failed to establish shared memory connection with local node (node has left): " +
                         node.id());
+
+                return new GridFinishedFuture<>(e);
             }
         }
 
-        connectGate.enter();
 
         try {
-            GridCommunicationClient client = createTcpClient(node, connIdx);
-
-            if (log.isDebugEnabled())
-                log.debug("TCP client created: " + client);
-
-            return client;
+            return createTcpClient(node, connIdx);
         }
-        finally {
-            connectGate.leave();
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
         }
     }
 
@@ -2647,12 +2974,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client,
-                    null,
-                    node.id(),
-                    timeoutHelper.nextTimeoutChunk(connTimeout0),
-                    null,
-                    null);
+                safeHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
             }
             catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -2714,7 +3036,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             ConnectionKey id = ses.meta(CONN_IDX_META);
 
             if (id != null) {
-                ClusterNode node = getSpiContext().node(id.nodeId);
+                ClusterNode node = getSpiContext().node(id.nodeId());
 
                 if (node != null && node.isClient()) {
                     String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -2737,532 +3059,527 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      *
      * @param node Remote node.
      * @param connIdx Connection index.
-     * @return Client.
+     * @return Client future.
      * @throws IgniteCheckedException If failed.
      */
-    protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
-        Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
-        Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
-        Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
-        Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
-
-        boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
-        boolean isExtAddrsExist = !F.isEmpty(extAddrs);
+    protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+            throws IgniteCheckedException
+    {
+        TcpClientFuture fut = new TcpClientFuture(node, connIdx);
 
-        if (!isRmtAddrsExist && !isExtAddrsExist)
-            throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
-                "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
-                "that you use the same communication SPI on all nodes. Remote node id: " + node.id());
+        connectGate.enter();
 
-        LinkedHashSet<InetSocketAddress> addrs;
+        fut.connect();
 
-        // Try to connect first on bound addresses.
-        if (isRmtAddrsExist) {
-            List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
+        fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+            @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) {
+                connectGate.leave();
+            }
+        });
 
-            boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
+        return fut;
+    }
 
-            Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
+    /**
+     * @param timeoutObj Timeout object.
+     */
+    private boolean cancelHandshakeTimeout(HandshakeTimeoutObject timeoutObj) {
+        boolean cancelled = timeoutObj.cancel();
 
-            addrs = new LinkedHashSet<>(addrs0);
-        }
-        else
-            addrs = new LinkedHashSet<>();
+        if (cancelled)
+            removeTimeoutObject(timeoutObj);
 
-        // Then on mapped external addresses.
-        if (isExtAddrsExist)
-            addrs.addAll(extAddrs);
+        return cancelled;
+    }
 
-        Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
+    /**
+     *
+     */
+    private class TcpClientFuture extends GridFutureAdapter<GridCommunicationClient> {
+        /** Node. */
+        private final ClusterNode node;
 
-        for (InetSocketAddress addr : addrs)
-            allInetAddrs.add(addr.getAddress());
+        /** Timeout helper. */
+        private final IgniteSpiOperationTimeoutHelper timeoutHelper =
+            new IgniteSpiOperationTimeoutHelper(TcpCommunicationSpi.this);
 
-        List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
+        /** Addresses. */
+        private Collection<InetSocketAddress> addrs;
 
-        if (reachableInetAddrs.size() < allInetAddrs.size()) {
-            LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
+        /** Addresses it. */
+        private Iterator<InetSocketAddress> addrsIt;
 
-            for (InetSocketAddress addr : addrs) {
-                if (reachableInetAddrs.contains(addr.getAddress()))
-                    addrs0.add(addr);
-            }
-            for (InetSocketAddress addr : addrs) {
-                if (!reachableInetAddrs.contains(addr.getAddress()))
-                    addrs0.add(addr);
-            }
+        /** Current addresses. */
+        private volatile InetSocketAddress currAddr;
 
-            addrs = addrs0;
-        }
+        /** Err. */
+        private volatile IgniteCheckedException err;
 
-        if (log.isDebugEnabled())
-            log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
+        /** Connect attempts. */
+        private volatile int connectAttempts;
 
-        boolean conn = false;
-        GridCommunicationClient client = null;
-        IgniteCheckedException errs = null;
+        /** Attempts. */
+        private volatile int attempt;
 
-        int connectAttempts = 1;
+        /** Connection index. */
+        private volatile int connIdx;
 
-        for (InetSocketAddress addr : addrs) {
-            long connTimeout0 = connTimeout;
+        /**
+         * @param node Node.
+         */
+        TcpClientFuture(ClusterNode node, int connIdx) {
+            this.node = node;
+            this.connIdx = connIdx;
+        }
 
-            int attempt = 1;
+        /**
+         * Connects to remote node.
+         */
+        void connect() {
+            try {
+                addrs = addrs();
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
 
-            IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+                return;
+            }
 
-            while (!conn) { // Reconnection on handshake timeout.
-                try {
-                    SocketChannel ch = SocketChannel.open();
+            addrsIt = addrs.iterator();
 
-                    ch.configureBlocking(true);
+            tryConnect(true);
+        }
 
-                    ch.socket().setTcpNoDelay(tcpNoDelay);
-                    ch.socket().setKeepAlive(true);
+        /**
+         *
+         */
+        private void tryConnect(boolean next) {
+            if (next && !addrsIt.hasNext()) {
+                IgniteCheckedException err0 = err;
 
-                    if (sockRcvBuf > 0)
-                        ch.socket().setReceiveBufferSize(sockRcvBuf);
+                assert err0 != null;
 
-                    if (sockSndBuf > 0)
-                        ch.socket().setSendBufferSize(sockSndBuf);
+                UUID nodeId = node.id();
 
-                    if (getSpiContext().node(node.id()) == null) {
-                        U.closeQuiet(ch);
+                if (getSpiContext().node(nodeId) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
+                        X.hasCause(err, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
+                                IgniteSpiOperationTimeoutException.class))
+                {
+                    LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be " +
+                        "dropped from cluster [" + "rmtNode=" + node + ", err=" + err +
+                        ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']');
 
-                        throw new ClusterTopologyCheckedException("Failed to send message " +
-                            "(node left topology): " + node);
-                    }
+                    getSpiContext().failNode(nodeId, "TcpCommunicationSpi failed to establish connection to node " +
+                        "[rmtNode=" + node + ", errs=" + err + ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']');
+                }
 
-                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
+                onDone(err0);
 
-                    GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
+                return;
+            }
 
-                    if (!recoveryDesc.reserve()) {
-                        U.closeQuiet(ch);
+            if (next) {
+                attempt = 0;
 
-                        return null;
-                    }
+                connectAttempts = 0;
 
-                    long rcvCnt = -1;
+                currAddr = addrsIt.next();
+            }
 
-                    Map<Integer, Object> meta = new HashMap<>();
+            InetSocketAddress addr = currAddr;
 
-                    GridSslMeta sslMeta = null;
+            try {
+                final SocketChannel ch = SocketChannel.open();
 
-                    try {
-                        ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
+                ch.configureBlocking(false);
 
-                        if (isSslEnabled()) {
-                            meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta());
+                ch.socket().setTcpNoDelay(tcpNoDelay);
+                ch.socket().setKeepAlive(true);
 
-                            SSLEngine sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();
+                if (sockRcvBuf > 0)
+                    ch.socket().setReceiveBufferSize(sockRcvBuf);
 
-                            sslEngine.setUseClientMode(true);
+                if (sockSndBuf > 0)
+                    ch.socket().setSendBufferSize(sockSndBuf);
 
-                            sslMeta.sslEngine(sslEngine);
-                        }
+                if (getSpiContext().node(node.id()) == null) {
+                    U.closeQuiet(ch);
 
-                        Integer handshakeConnIdx = connIdx;
+                    onError(new ClusterTopologyCheckedException("Failed to send message " +
+                        "(node left topology): " + node));
 
-                        rcvCnt = safeHandshake(ch,
-                            recoveryDesc,
-                            node.id(),
-                            timeoutHelper.nextTimeoutChunk(connTimeout0),
-                            sslMeta,
-                            handshakeConnIdx);
+                    return;
+                }
 
-                        if (rcvCnt == -1)
-                            return null;
-                    }
-                    finally {
-                        if (recoveryDesc != null && rcvCnt == -1)
-                            recoveryDesc.release();
-                    }
+                final ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
 
-                    try {
-                        meta.put(CONN_IDX_META, connKey);
+                final GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
 
-                        if (recoveryDesc != null) {
-                            recoveryDesc.onHandshake(rcvCnt);
+                if (!recoveryDesc.reserve()) {
+                    U.closeQuiet(ch);
 
-                            meta.put(-1, recoveryDesc);
-                        }
+                    onDone();
 
-                        GridNioSession ses = nioSrvr.createSession(ch, meta).get();
+                    return;
+                }
 
-                        client = new GridTcpNioCommunicationClient(connIdx, ses, log);
+                final Map<Integer, Object> meta = new HashMap<>();
 
-                        conn = true;
-                    }
-                    finally {
-                        if (!conn) {
-                            if (recoveryDesc != null)
-                                recoveryDesc.release();
-                        }
-                    }
-                }
-                catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
-                    if (client != null) {
-                        client.forceClose();
+                final ConnectContext ctx = new ConnectContext();
 
-                        client = null;
-                    }
+                ctx.expNodeId = node.id();
 
-                    if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
-                        timeoutHelper.checkFailureTimeoutReached(e))) {
+                ctx.tcpClientFut = this;
 
-                        String msg = "Handshake timed out (failure detection timeout is reached) " +
-                            "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']';
+                ctx.connIdx = connIdx;
 
-                        onException(msg, e);
+                meta.put(CONN_CTX_META_KEY, ctx);
 
-                        if (log.isDebugEnabled())
-                            log.debug(msg);
+                meta.put(RECOVERY_DESC_META_KEY, recoveryDesc);
 
-                        if (errs == null)
-                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
-                                "Make sure that each ComputeTask and cache Transaction has a timeout set " +
-                                "in order to prevent parties from waiting forever in case of network issues " +
-                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+                final int timeoutChunk = (int)timeoutHelper.nextTimeoutChunk(connTimeout);
 
-                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+                final int attempt0 = attempt;
 
-                        break;
-                    }
+                final ConnectionTimeoutObject connTimeoutObj = new ConnectionTimeoutObject(ch, meta,
+                        U.currentTimeMillis() + timeoutChunk * (1L << attempt0));
 
-                    assert !failureDetectionTimeoutEnabled();
+                addTimeoutObject(connTimeoutObj);
 
-                    onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
-                        ", addr=" + addr + ']', e);
+                boolean connect = ch.connect(addr);
 
-                    if (log.isDebugEnabled())
-                        log.debug(
-                            "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
-                                ", addr=" + addr + ", err=" + e + ']');
+                IgniteInClosure<IgniteInternalFuture<GridNioSession>> lsnr0 = new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+                    @Override public void apply(final IgniteInternalFuture<GridNioSession> fut) {
+                        GridNioSession ses = null;
 
-                    if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
-                        if (log.isDebugEnabled())
-                            log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
-                                "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
-                                ", attempt=" + attempt + ", reconCnt=" + reconCnt +
-                                ", err=" + e.getMessage() + ", addr=" + addr + ']');
+                        try {
+                            ses = fut.get();
 
-                        if (errs == null)
-                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
-                                "Make sure that each ComputeTask and cache Transaction has a timeout set " +
-                                "in order to prevent parties from waiting forever in case of network issues " +
-                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+                            boolean canceled = connTimeoutObj.cancel();
 
-                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+                            if (canceled)
+                                removeTimeoutObject(connTimeoutObj);
+                            else {
+                                final GridNioSession ses0 = ses;
 
-                        break;
-                    }
-                    else {
-                        attempt++;
+                                Runnable clo = new Runnable() {
+                                    @Override public void run() {
+                                        GridNioFuture<Boolean> fut = nioSrvr.close(ses0);
 
-                        connTimeout0 *= 2;
+                                        final SocketTimeoutException e = new SocketTimeoutException("Connect timed " +
+                                            "(consider increasing 'connTimeout' configuration property) [addr=" +
+                                            currAddr + ", connTimeout=" + connTimeout + ']');
 
-                        // Continue loop.
-                    }
-                }
-                catch (Exception e) {
-                    if (client != null) {
-                        client.forceClose();
+                                        fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                                            @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+                                                Runnable clo = new Runnable() {
+                                                    @Override public void run() {
+                                                        onError(e);
+                                                    }
+                                                };
 
-                        client = null;
-                    }
+                                                SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo);
 
-                    onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
+                                                commWorker.addSessionStateChangeRequest(sesInfo);
+                                            }
+                                        });
+                                    }
+                                };
 
-                    if (log.isDebugEnabled())
-                        log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
+                                commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, clo));
 
-                    boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
+                                return;
+                            }
 
-                    if (failureDetThrReached)
-                        LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
-                            "configuration property) [addr=" + addr + ", failureDetectionTimeout=" +
-                            failureDetectionTimeout() + ']');
-                    else if (X.hasCause(e, SocketTimeoutException.class))
-                        LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
-                            "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
+                            int timeoutChunk1 = (int) timeoutHelper.nextTimeoutChunk(connTimeout);
 
-                    if (errs == null)
-                        errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
-                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
-                            "in order to prevent parties from waiting forever in case of network issues " +
-                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+                            long time = U.currentTimeMillis() + timeoutChunk1 * (1L << attempt0);
 
-                    errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+                            HandshakeTimeoutObject<SocketChannel> handshakeTimeoutObj =
+                                    new HandshakeTimeoutObject<>(ch, TcpClientFuture.this, time);
 
-                    // Reconnect for the second time, if connection is not established.
-                    if (!failureDetThrReached && connectAttempts < 2 &&
-                        (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
-                        connectAttempts++;
+                            ctx.handshakeTimeoutObj = handshakeTimeoutObj;
 
-                        continue;
-                    }
+                            addTimeoutObject(handshakeTimeoutObj);
+                        }
+                        catch (final IgniteSpiOperationTimeoutException e) {
+                            assert ses != null;
 
-                    break;
-                }
-            }
+                            final GridNioSession ses0 = ses;
 
-            if (conn)
-                break;
-        }
+                            commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, new Runnable() {
+                                @Override public void run() {
+                                    GridNioFuture<Boolean> closeFut = nioSrvr.close(ses0);
 
-        if (client == null) {
-            assert errs != null;
+                                    closeFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                                        @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+                                            Runnable clo = new Runnable() {
+                                                @Override public void run() {
+                                                    onError(e);
+                                                }
+                                            };
 
-            if (X.hasCause(errs, ConnectException.class))
-                LT.warn(log, "Failed to connect to a remote node " +
-                    "(make sure that destination node is alive and " +
-                    "operating system firewall is disabled on local and remote hosts) " +
-                    "[addrs=" + addrs + ']');
+                                            SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo);
 
-            if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
-                X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
-                    IgniteSpiOperationTimeoutException.class)) {
-                LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
-                    "cluster [" +
-                    "rmtNode=" + node +
-                    ", err=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+                                            commWorker.addSessionStateChangeRequest(sesInfo);
+                                        }
+                                    });
+                                }
+                            }));
+                        }
+                        catch (IgniteCheckedException e) {
+                            connTimeoutObj.cancel();
 
-                getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
-                    "rmtNode=" + node +
-                    ", errs=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
-            }
+                            removeTimeoutObject(connTimeoutObj);
 
-            throw errs;
+                            recoveryDesc.release();
+
+                            onError(e);
+                        }
+                    }
+                };
+
+                nioSrvr.createSession(ch, meta, !connect, lsnr0);
+            }
+            catch (Exception e) {
+                onDone(e);
+            }
         }
 
-        return client;
-    }
+        /**
+         * @param e Exception.
+         */
+        void onError(Exception e) {
+            if (e instanceof HandshakeTimeoutException || e instanceof IgniteSpiOperationTimeoutException) {
+                if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+                    timeoutHelper.checkFailureTimeoutReached(e))) {
 
-    /**
-     * Performs handshake in timeout-safe way.
-     *
-     * @param client Client.
-     * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
-     * @param rmtNodeId Remote node.
-     * @param timeout Timeout for handshake.
-     * @param sslMeta Session meta.
-     * @param handshakeConnIdx Non null connection index if need send it in handshake.
-     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
-     * @return Handshake response.
-     */
-    @SuppressWarnings("ThrowFromFinallyBlock")
-    private <T> long safeHandshake(
-        T client,
-        @Nullable GridNioRecoveryDescriptor recovery,
-        UUID rmtNodeId,
-        long timeout,
-        GridSslMeta sslMeta,
-        @Nullable Integer handshakeConnIdx
-    ) throws IgniteCheckedException {
-        HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+                    String msg = "Handshake timed out (failure detection timeout is reached) " +
+                        "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + currAddr + ']';
 
-        addTimeoutObject(obj);
+                    onException(msg, e);
 
-        long rcvCnt = 0;
+                    if (log.isDebugEnabled())
+                        log.debug(msg);
 
-        try {
-            if (client instanceof GridCommunicationClient)
-                ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
-            else {
-                SocketChannel ch = (SocketChannel)client;
+                    if (err == null)
+                        err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+                            "in order to prevent parties from waiting forever in case of network issues " +
+                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
 
-                boolean success = false;
+                    err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
 
-                try {
-                    BlockingSslHandler sslHnd = null;
+                    tryConnect(true);
 
-                    ByteBuffer buf;
+                    return;
+                }
 
-                    if (isSslEnabled()) {
-                        assert sslMeta != null;
+                assert !failureDetectionTimeoutEnabled();
 
-                        sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+                long connTimeout0 = connTimeout * attempt;
 
-                        if (!sslHnd.handshake())
-                            throw new IgniteCheckedException("SSL handshake is not completed.");
+                onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout +
+                    ", addr=" + currAddr + ']', e);
 
-                        ByteBuffer handBuff = sslHnd.applicationBuffer();
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout +
+                            ", addr=" + currAddr + ", err=" + e + ']');
 
-                        if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
-                            buf = ByteBuffer.allocate(1000);
+                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
+                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
+                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+                            ", err=" + e.getMessage() + ", addr=" + currAddr + ']');
 
-                            int read = ch.read(buf);
+                    if (err == null)
+                        err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+                            "in order to prevent parties from waiting forever in case of network issues " +
+                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
 
-                            if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+                    err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
 
-                            buf.flip();
+                    tryConnect(true);
+                }
+                else {
+                    attempt++;
 
-                            buf = sslHnd.decode(buf);
-                        }
-                        else
-                            buf = handBuff;
-                    }
-                    else {
-                        buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
+                    tryConnect(false); // Reconnection on handshake timeout.
+                }
+            }
+            else {
+                onException("Client creation failed [addr=" + currAddr + ", err=" + e + ']', e);
 
-                        for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
-                            int read = ch.read(buf);
+                if (log.isDebugEnabled())
+                    log.debug("Client creation failed [addr=" + currAddr + ", err=" + e + ']');
 
-                            if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+                boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
 
-                            i += read;
-                        }
-                    }
+                if (failureDetThrReached)
+                    LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
+                        "configuration property) [addr=" + currAddr + ", failureDetectionTimeout=" +
+                        failureDetectionTimeout() + ']');
+                else if (X.hasCause(e, SocketTimeoutException.class))
+                    LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
+                        "configuration property) [addr=" + currAddr + ", connTimeout=" + connTimeout + ']');
 
-                    UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
+                if (err == null)
+                    err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                        "Make sure that each ComputeTask and GridCacheTransaction has a timeout set " +
+                        "in order to prevent parties from waiting forever in case of network issues " +
+                        "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
 
-                    if (!rmtNodeId.equals(rmtNodeId0))
-                        throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
-                            ", rcvd=" + rmtNodeId0 + ']');
-                    else if (log.isDebugEnabled())
-                        log.debug("Received remote node ID: " + rmtNodeId0);
+                err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
 
-                    if (isSslEnabled()) {
-                        assert sslHnd != null;
+                // Reconnect for the second time, if connection is not established.
+                int connectAttempts0;
 
-                        ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
-                    }
-                    else
-                        ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+                if (!failureDetThrReached && (connectAttempts0 = connectAttempts) <= 2 &&
+                    (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
+                    connectAttempts = connectAttempts0 + 1;
 
-                    ClusterNode locNode = getLocalNode();
+                    tryConnect(false);
 
-                    if (locNode == null)
-                        throw new IgniteCheckedException("Local node has not been started or " +
-                            "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+                    return;
+                }
 
-                    if (recovery != null) {
-                        HandshakeMessage msg;
+                tryConnect(true);
+            }
 
-                        int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
+            onDone(e);
+        }
 
-                        if (handshakeConnIdx != null) {
-                            msg = new HandshakeMessage2(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received(),
-                                handshakeConnIdx);
+        /**
+         *
+         */
+        private Collection<InetSocketAddress> addrs() throws IgniteCheckedException {
+            Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
+            Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
+            Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
+            Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
 
-                            msgSize += 4;
-                        }
-                        else {
-                            msg = new HandshakeMessage(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received());
-                        }
+            boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
+            boolean isExtAddrsExist = !F.isEmpty(extAddrs);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+            if (!isRmtAddrsExist && !isExtAddrsExist)
+                throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
+                    "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
+                    "that you use the same communication SPI on all nodes. Remote node id: " + node.id());
 
-                        buf = ByteBuffer.allocate(msgSize);
+            LinkedHashSet<InetSocketAddress> addrs;
 
-                        buf.order(ByteOrder.nativeOrder());
+            // Try to connect first on bound addresses.
+            if (isRmtAddrsExist) {
+                List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
 
-                        boolean written = msg.writeTo(buf, null);
+                boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
 
-                        assert written;
+                Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
 
-                        buf.flip();
+                addrs = new LinkedHashSet<>(addrs0);
+            }
+            else
+                addrs = new LinkedHashSet<>();
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+            // Then on mapped external addresses.
+            if (isExtAddrsExist)
+                addrs.addAll(extAddrs);
 
-                            ch.write(sslHnd.encrypt(buf));
-                        }
-                        else
-                            ch.write(buf);
-                    }
-                    else {
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+            Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
 
-                            ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
-                        }
-                        else
-                            ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
-                    }
+            for (InetSocketAddress addr : addrs)
+                allInetAddrs.add(addr.getAddress());
 
-                    if (recovery != null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+            List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
+
+            if (reachableInetAddrs.size() < allInetAddrs.size()) {
+                LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                for (InetSocketAddress addr : addrs) {
+                    if (reachableInetAddrs.contains(addr.getAddress()))
+                        addrs0.add(addr);
+                }
+                for (InetSocketAddress addr : addrs) {
+                    if (!reachableInetAddrs.contains(addr.getAddress()))
+                        addrs0.add(addr);
+                }
 
-                            buf = ByteBuffer.allocate(1000);
+                addrs = addrs0;
+            }
 
-                            ByteBuffer decode = null;
+            if (log.isDebugEnabled())
+                log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
 
-                            buf.order(ByteOrder.nativeOrder());
+            return addrs;
+        }
 
-                            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                                int read = ch.read(buf);
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TcpClientFuture.class, this);
+        }
+    }
 
-                                if (read == -1)
-                                    throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+    /**
+     * Performs handshake in timeout-safe way.
+     *
+     * @param client Client.
+     * @param rmtNodeId Remote node.
+     * @param timeout Timeout for handshake.
+     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+     * @return Handshake response.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    private <T> long safeHandshake(T client, UUID rmtNodeId, long timeout) throws IgniteCheckedException {
+        HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, null, U.currentTimeMillis() + timeout);
 
-                                buf.flip();
+        addTimeoutObject(obj);
 
-                                decode = sslHnd.decode(buf);
+        long rcvCnt = 0;
 
-                                i += decode.remaining();
+        try {
+            if (client instanceof GridCommunicationClient)
+                ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
+            else {
+                SocketChannel ch = (SocketChannel)client;
 
-                                buf.clear();
-                            }
+                boolean success = false;
 
-                            rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+                try {
+                    ByteBuffer buf;
 
-                            if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
-                                decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                    buf = ByteBuffer.allocate(17);
 
-                                sslMeta.decodedBuffer(decode);
-                            }
+                    for (int i = 0; i < 17; ) {
+                        int read = ch.read(buf);
 
-                            ByteBuffer inBuf = sslHnd.inputBuffer();
+                        if (read == -1)
+                            throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
 
-                            if (inBuf.position() > 0)
-                                sslMeta.encodedBuffer(inBuf);
-                        }
-                        else {
-                            buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                        i += read;
+                    }
 
-                            buf.order(ByteOrder.nativeOrder());
+                    UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
 
-                            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                                int read = ch.read(buf);
+                    if (!rmtNodeId.equals(rmtNodeId0))
+                        throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
+                            ", rcvd=" + rmtNodeId0 + ']');
+                    else if (log.isDebugEnabled())
+                        log.debug("Received remote node ID: " + rmtNodeId0);
 
-                                if (read == -1)
-                                    throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+                    ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
-                                i += read;
-                            }
+                    ClusterNode locNode = getLocalNode();
 
-                            rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
-                        }
+                    if (locNode == null)
+                        throw new IgniteCheckedException("Local node has not been started or " +
+                            "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                    ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
 
-                        if (rcvCnt == -1) {
-                            if (log.isDebugEnabled())
-                                log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
-                        }
-                        else
-                            success = true;
-                    }
-                    else
-                        success = true;
+                    success = true;
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())
@@ -3619,7 +3936,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private class CommunicationWorker extends IgniteSpiThread {
         /** */
-        private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>();
+        private final BlockingQueue<SessionInfo> q = new LinkedBlockingQueue<>();
 
         /**
          * @param igniteInstanceName Ignite instance name.
@@ -3634,10 +3951,56 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 log.debug("Tcp communication worker has been started.");
 
             while (!isInterrupted()) {
-                DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+                SessionInfo sesInfo = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+
+                if (sesInfo != null) {
+                    ConnectContext ctx;
+
+                    TcpClientFuture clientFut;
+
+                    switch (sesInfo.state) {
+                        case RECONNECT:
+                            processDisconnect(sesInfo);
+
+                            break;
+
+                        case RETRY:
+                            Runnable clo = sesInfo.clo;
+
+                            assert clo != null;
 
-                if (disconnectData != null)
-                    processDisconnect(disconnectData);
+                            clo.run();
+
+                            break;
+
+                        case READY:
+                            ctx = sesInfo.ses.meta(CONN_CTX_META_KEY);
+
+                            assert ctx != null;
+                            assert ctx.tcpClientFut != null;
+
+                            GridTcpNioCommunicationClient client =
+                        

<TRUNCATED>