You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/09 09:29:06 UTC

[3/7] ignite git commit: IGNITE-3220 I/O bottleneck on server/client cluster configuration Communications optimizations: - possibility to open separate in/out connections - possibility to have multiple connections between nodes - implemented NIO sessions

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1fe437c..b392c07 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
@@ -53,6 +54,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -103,6 +105,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -179,6 +182,7 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META
  * <li>Node local IP address (see {@link #setLocalAddress(String)})</li>
  * <li>Node local port number (see {@link #setLocalPort(int)})</li>
  * <li>Local port range (see {@link #setLocalPortRange(int)}</li>
+ * <li>Connections per node (see {@link #setConnectionsPerNode(int)})</li>
  * <li>Connection buffer flush frequency (see {@link #setConnectionBufferFlushFrequency(long)})</li>
  * <li>Connection buffer size (see {@link #setConnectionBufferSize(int)})</li>
  * <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li>
@@ -238,6 +242,9 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META
 @IgniteSpiConsistencyChecked(optional = false)
 public class TcpCommunicationSpi extends IgniteSpiAdapter
     implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+    /** */
+    private static final IgniteProductVersion MULTIPLE_CONN_SINCE_VER = IgniteProductVersion.fromString("1.8.0");
+
     /** IPC error message. */
     public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
         "(switching to TCP, may be slower).";
@@ -257,11 +264,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
 
+    /** */
+    public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";
+
     /** Default port which node sets listener to (value is <tt>47100</tt>). */
     public static final int DFLT_PORT = 47100;
 
     /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
-    public static final int DFLT_SHMEM_PORT = 48100;
+    public static final int DFLT_SHMEM_PORT = -1;
 
     /** Default idle connection timeout (value is <tt>30000</tt>ms). */
     public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
@@ -283,12 +293,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Default count of selectors for TCP server equals to
-     * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
+     * {@code "Math.min(8, Runtime.getRuntime().availableProcessors())"}.
      */
-    public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
+    public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
 
-    /** Node ID meta for session. */
-    private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
+    /** Connection index meta for session. */
+    private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
@@ -303,11 +313,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final boolean DFLT_TCP_NODELAY = true;
 
     /** Default received messages threshold for sending ack. */
-    public static final int DFLT_ACK_SND_THRESHOLD = 16;
+    public static final int DFLT_ACK_SND_THRESHOLD = 32;
 
     /** Default socket write timeout. */
     public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
 
+    /** Default connections per node. */
+    public static final int DFLT_CONN_PER_NODE = 1;
+
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {
         @Override public void run() {
@@ -327,11 +340,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** */
     private ConnectGateway connectGate;
 
+    /** */
+    private ConnectionPolicy connPlc;
+
     /** Server listener. */
     private final GridNioServerListener<Message> srvLsnr =
         new GridNioServerListenerAdapter<Message>() {
             @Override public void onSessionWriteTimeout(GridNioSession ses) {
-                LT.warn(log, "Communication SPI Session write timed out (consider increasing " +
+                LT.warn(log,"Communication SPI session write timed out (consider increasing " +
                     "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
                     ", writeTimeout=" + sockWriteTimeout + ']');
 
@@ -347,46 +363,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Sending local node ID to newly accepted session: " + ses);
 
-                    ses.send(nodeIdMessage());
+                    try {
+                        ses.sendNoFuture(nodeIdMessage());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send message: " + e, e);
+                    }
                 }
             }
 
             @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
-                UUID id = ses.meta(NODE_ID_META);
+                ConnectionKey connId = ses.meta(CONN_IDX_META);
 
-                if (id != null) {
-                    GridCommunicationClient client = clients.get(id);
+                if (connId != null) {
+                    UUID id = connId.nodeId();
 
-                    if (client instanceof GridTcpNioCommunicationClient &&
-                        ((GridTcpNioCommunicationClient) client).session() == ses) {
-                        client.close();
+                    GridCommunicationClient[] nodeClients = clients.get(id);
 
-                        clients.remove(id, client);
+                    if (nodeClients != null) {
+                        for (GridCommunicationClient client : nodeClients) {
+                            if (client instanceof GridTcpNioCommunicationClient &&
+                                ((GridTcpNioCommunicationClient)client).session() == ses) {
+                                client.close();
+
+                                removeNodeClient(id, client);
+                            }
+                        }
                     }
 
                     if (!stopping) {
-                        boolean reconnect = false;
-
-                        GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
-
-                        if (recoveryData != null) {
-                            if (recoveryData.nodeAlive(getSpiContext().node(id))) {
-                                if (!recoveryData.messagesFutures().isEmpty()) {
-                                    reconnect = true;
+                        GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
 
+                        if (outDesc != null) {
+                            if (outDesc.nodeAlive(getSpiContext().node(id))) {
+                                if (!outDesc.messagesRequests().isEmpty()) {
                                     if (log.isDebugEnabled())
                                         log.debug("Session was closed but there are unacknowledged messages, " +
-                                            "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
+                                            "will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
+
+                                    DisconnectedSessionInfo disconnectData =
+                                        new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
+
+                                    commWorker.addProcessDisconnectRequest(disconnectData);
                                 }
                             }
                             else
-                                recoveryData.onNodeLeft();
+                                outDesc.onNodeLeft();
                         }
-
-                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
-                            reconnect);
-
-                        commWorker.addProcessDisconnectRequest(disconnectData);
                     }
 
                     CommunicationListener<Message> lsnr0 = lsnr;
@@ -403,21 +426,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             private void onFirstMessage(GridNioSession ses, Message msg) {
                 UUID sndId;
 
-                if (msg instanceof NodeIdMessage)
-                    sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+                ConnectionKey connKey;
+
+                if (msg instanceof NodeIdMessage) {
+                    sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
+                    connKey = new ConnectionKey(sndId, 0, -1);
+                }
                 else {
                     assert msg instanceof HandshakeMessage : msg;
 
+                    HandshakeMessage msg0 = (HandshakeMessage)msg;
+
                     sndId = ((HandshakeMessage)msg).nodeId();
+                    connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
                 }
 
                 if (log.isDebugEnabled())
                     log.debug("Remote node ID received: " + sndId);
 
-                final UUID old = ses.addMeta(NODE_ID_META, sndId);
-
-                assert old == null;
-
                 final ClusterNode rmtNode = getSpiContext().node(sndId);
 
                 if (rmtNode == null) {
@@ -429,57 +455,65 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     return;
                 }
 
+                final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
+
+                assert old == null;
+
                 ClusterNode locNode = getSpiContext().localNode();
 
                 if (ses.remoteAddress() == null)
                     return;
 
-                GridCommunicationClient oldClient = clients.get(sndId);
+                assert msg instanceof HandshakeMessage : msg;
 
-                boolean hasShmemClient = false;
+                HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                if (oldClient != null) {
-                    if (oldClient instanceof GridTcpNioCommunicationClient) {
-                        if (log.isDebugEnabled())
-                            log.debug("Received incoming connection when already connected " +
-                                    "to this node, rejecting [locNode=" + locNode.id() +
-                                    ", rmtNode=" + sndId + ']');
+                if (usePairedConnections(rmtNode)) {
+                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
 
-                        ses.send(new RecoveryLastReceivedMessage(-1));
+                    ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);
 
-                        return;
-                    }
+                    boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c);
+
+                    if (reserve)
+                        connectedNew(recoveryDesc, ses, true);
                     else {
-                        assert oldClient instanceof GridShmemCommunicationClient;
+                        if (c.failed) {
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+
+                            for (GridNioSession ses0 : nioSrvr.sessions()) {
+                                ConnectionKey key0 = ses0.meta(CONN_IDX_META);
 
-                        hasShmemClient = true;
+                                if (ses0.accepted() && key0 != null &&
+                                    key0.nodeId().equals(connKey.nodeId()) &&
+                                    key0.connectionIndex() == connKey.connectionIndex() &&
+                                    key0.connectCount() < connKey.connectCount())
+                                    ses0.close();
+                            }
+                        }
                     }
                 }
+                else {
+                    assert connKey.connectionIndex() >= 0 : connKey;
 
-                GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
-
-                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
-
-                assert msg instanceof HandshakeMessage : msg;
-
-                HandshakeMessage msg0 = (HandshakeMessage)msg;
+                    GridCommunicationClient[] curClients = clients.get(sndId);
 
-                final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
+                    GridCommunicationClient oldClient =
+                        curClients != null && connKey.connectionIndex() < curClients.length ?
+                            curClients[connKey.connectionIndex()] :
+                            null;
 
-                if (oldFut == null) {
-                    oldClient = clients.get(sndId);
+                    boolean hasShmemClient = false;
 
                     if (oldClient != null) {
                         if (oldClient instanceof GridTcpNioCommunicationClient) {
                             if (log.isDebugEnabled())
                                 log.debug("Received incoming connection when already connected " +
-                                        "to this node, rejecting [locNode=" + locNode.id() +
-                                        ", rmtNode=" + sndId + ']');
+                                    "to this node, rejecting [locNode=" + locNode.id() +
+                                    ", rmtNode=" + sndId + ']');
 
                             ses.send(new RecoveryLastReceivedMessage(-1));
 
-                            fut.onDone(oldClient);
-
                             return;
                         }
                         else {
@@ -489,51 +523,86 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                     }
 
-                    boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
-                            new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+                    GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Received incoming connection from remote node " +
+                    GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
+
+                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
+
+                    if (oldFut == null) {
+                        curClients = clients.get(sndId);
+
+                        oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
+                            curClients[connKey.connectionIndex()] : null;
+
+                        if (oldClient != null) {
+                            if (oldClient instanceof GridTcpNioCommunicationClient) {
+                                assert oldClient.connectionIndex() == connKey.connectionIndex() : oldClient;
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Received incoming connection when already connected " +
+                                        "to this node, rejecting [locNode=" + locNode.id() +
+                                        ", rmtNode=" + sndId + ']');
+
+                                ses.send(new RecoveryLastReceivedMessage(-1));
+
+                                fut.onDone(oldClient);
+
+                                return;
+                            }
+                            else {
+                                assert oldClient instanceof GridShmemCommunicationClient;
+
+                                hasShmemClient = true;
+                            }
+                        }
+
+                        boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received incoming connection from remote node " +
                                 "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
 
-                    if (reserved) {
-                        try {
-                            GridTcpNioCommunicationClient client =
+                        if (reserved) {
+                            try {
+                                GridTcpNioCommunicationClient client =
                                     connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
 
-                            fut.onDone(client);
-                        }
-                        finally {
-                            clientFuts.remove(rmtNode.id(), fut);
+                                fut.onDone(client);
+                            }
+                            finally {
+                                clientFuts.remove(connKey, fut);
+                            }
                         }
                     }
-                }
-                else {
-                    if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Received incoming connection from remote node while " +
+                    else {
+                        if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received incoming connection from remote node while " +
                                     "connecting to this node, rejecting [locNode=" + locNode.id() +
                                     ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
                                     ", rmtNodeOrder=" + rmtNode.order() + ']');
-                        }
+                            }
 
-                        ses.send(new RecoveryLastReceivedMessage(-1));
-                    }
-                    else {
-                        // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
-                        boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
-                                new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+                        }
+                        else {
+                            // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
+                            boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+                                new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
 
-                        if (reserved)
-                            connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+                            if (reserved)
+                                connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+                        }
                     }
                 }
             }
 
             @Override public void onMessage(GridNioSession ses, Message msg) {
-                UUID sndId = ses.meta(NODE_ID_META);
+                ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
-                if (sndId == null) {
+                if (connKey == null) {
                     assert ses.accepted() : ses;
 
                     if (!connectGate.tryEnter()) {
@@ -555,29 +624,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 else {
                     rcvdMsgsCnt.increment();
 
-                    GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+                    if (msg instanceof RecoveryLastReceivedMessage) {
+                        GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
 
-                    if (recovery != null) {
-                        if (msg instanceof RecoveryLastReceivedMessage) {
+                        if (recovery != null) {
                             RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
 
-                            if (log.isDebugEnabled())
-                                log.debug("Received recovery acknowledgement [rmtNode=" + sndId +
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
+                                    ", connIdx=" + connKey.connectionIndex() +
                                     ", rcvCnt=" + msg0.received() + ']');
+                            }
 
                             recovery.ackReceived(msg0.received());
 
                             return;
                         }
-                        else {
+                    }
+                    else {
+                        GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
+
+                        if (recovery != null) {
                             long rcvCnt = recovery.onReceived();
 
                             if (rcvCnt % ackSndThreshold == 0) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Send recovery acknowledgement [rmtNode=" + sndId +
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() +
+                                        ", connIdx=" + connKey.connectionIndex() +
                                         ", rcvCnt=" + rcvCnt + ']');
+                                }
 
-                                nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
+                                ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
 
                                 recovery.lastAcknowledged(rcvCnt);
                             }
@@ -603,7 +680,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     else
                         c = NOOP;
 
-                    notifyListener(sndId, msg, c);
+                    notifyListener(connKey.nodeId(), msg, c);
                 }
             }
 
@@ -611,7 +688,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
              * @param recovery Recovery descriptor.
              * @param ses Session.
              * @param node Node.
-             * @param rcvCnt Number of received messages..
+             * @param rcvCnt Number of received messages.
              * @param sndRes If {@code true} sends response for recovery handshake.
              * @param createClient If {@code true} creates NIO communication client.
              * @return Client.
@@ -623,32 +700,128 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 long rcvCnt,
                 boolean sndRes,
                 boolean createClient) {
+                ConnectionKey connKey = ses.meta(CONN_IDX_META);
+
+                assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
+                assert !usePairedConnections(node);
+
                 recovery.onHandshake(rcvCnt);
 
-                ses.recoveryDescriptor(recovery);
+                ses.inRecoveryDescriptor(recovery);
+                ses.outRecoveryDescriptor(recovery);
 
                 nioSrvr.resend(ses);
 
-                if (sndRes)
-                    nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+                try {
+                    if (sndRes)
+                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
+                }
 
-                recovery.connected();
+                recovery.onConnected();
 
                 GridTcpNioCommunicationClient client = null;
 
                 if (createClient) {
-                    client = new GridTcpNioCommunicationClient(ses, log);
-
-                    GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+                    client = new GridTcpNioCommunicationClient(connKey.connectionIndex(), ses, log);
 
-                    assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
-                        ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']';
+                    addNodeClient(node, connKey.connectionIndex(), client);
                 }
 
                 return client;
             }
 
             /**
+             * @param recovery Recovery descriptor.
+             * @param ses Session.
+             * @param sndRes If {@code true} sends response for recovery handshake.
+             */
+            private void connectedNew(
+                GridNioRecoveryDescriptor recovery,
+                GridNioSession ses,
+                boolean sndRes) {
+                try {
+                    ses.inRecoveryDescriptor(recovery);
+
+                    if (sndRes)
+                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+
+                    recovery.onConnected();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
+                }
+            }
+
+            /**
+             *
+             */
+            class ConnectClosureNew implements IgniteInClosure<Boolean> {
+                /** */
+                private static final long serialVersionUID = 0L;
+
+                /** */
+                private final GridNioSession ses;
+
+                /** */
+                private final GridNioRecoveryDescriptor recoveryDesc;
+
+                /** */
+                private final ClusterNode rmtNode;
+
+                /** */
+                private boolean failed;
+
+                /**
+                 * @param ses Incoming session.
+                 * @param recoveryDesc Recovery descriptor.
+                 * @param rmtNode Remote node.
+                 */
+                ConnectClosureNew(GridNioSession ses,
+                    GridNioRecoveryDescriptor recoveryDesc,
+                    ClusterNode rmtNode) {
+                    this.ses = ses;
+                    this.recoveryDesc = recoveryDesc;
+                    this.rmtNode = rmtNode;
+                }
+
+                /** {@inheritDoc} */
+                @Override public void apply(Boolean success) {
+                    try {
+                        failed = !success;
+
+                        if (success) {
+                            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> msgFut) {
+                                    try {
+                                        msgFut.get();
+
+                                        connectedNew(recoveryDesc, ses, false);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send recovery handshake " +
+                                                    "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+                                        recoveryDesc.release();
+                                    }
+                                }
+                            };
+
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
+                        }
+                        else
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1));
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send message: " + e, e);
+                    }
+                }
+            }
+
+            /**
              *
              */
             @SuppressWarnings("PackageVisibleInnerClass")
@@ -674,10 +847,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final boolean createClient;
 
+                /** */
+                private final ConnectionKey connKey;
+
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
                  * @param rmtNode Remote node.
+                 * @param connKey Connection key.
                  * @param msg Handshake message.
                  * @param createClient If {@code true} creates NIO communication client..
                  * @param fut Connect future.
@@ -685,12 +862,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 ConnectClosure(GridNioSession ses,
                     GridNioRecoveryDescriptor recoveryDesc,
                     ClusterNode rmtNode,
+                    ConnectionKey connKey,
                     HandshakeMessage msg,
                     boolean createClient,
                     GridFutureAdapter<GridCommunicationClient> fut) {
                     this.ses = ses;
                     this.recoveryDesc = recoveryDesc;
                     this.rmtNode = rmtNode;
+                    this.connKey = connKey;
                     this.msg = msg;
                     this.createClient = createClient;
                     this.fut = fut;
@@ -699,39 +878,44 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** {@inheritDoc} */
                 @Override public void apply(Boolean success) {
                     if (success) {
-                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
-                            @Override public void apply(IgniteInternalFuture<?> msgFut) {
-                                try {
-                                    msgFut.get();
+                        try {
+                            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> msgFut) {
+                                    try {
+                                        msgFut.get();
 
-                                    GridTcpNioCommunicationClient client =
-                                        connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
+                                        GridTcpNioCommunicationClient client =
+                                                connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
 
-                                    fut.onDone(client);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to send recovery handshake " +
-                                            "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+                                        fut.onDone(client);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send recovery handshake " +
+                                                    "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
 
-                                    recoveryDesc.release();
+                                        recoveryDesc.release();
 
-                                    fut.onDone();
-                                }
-                                finally {
-                                    clientFuts.remove(rmtNode.id(), fut);
+                                        fut.onDone();
+                                    }
+                                    finally {
+                                        clientFuts.remove(connKey, fut);
+                                    }
                                 }
-                            }
-                        };
+                            };
 
-                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to send message: " + e, e);
+                        }
                     }
                     else {
                         try {
                             fut.onDone();
                         }
                         finally {
-                            clientFuts.remove(rmtNode.id(), fut);
+                            clientFuts.remove(connKey, fut);
                         }
                     }
                 }
@@ -794,6 +978,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Shared memory server. */
     private IpcSharedMemoryServerEndpoint shmemSrv;
 
+    /** */
+    private boolean usePairedConnections = true;
+
+    /** */
+    private int connectionsPerNode = DFLT_CONN_PER_NODE;
+
     /** {@code TCP_NODELAY} option value for created sockets. */
     private boolean tcpNoDelay = DFLT_TCP_NODELAY;
 
@@ -816,7 +1006,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
 
     /** Clients. */
-    private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap();
 
     /** SPI listener. */
     private volatile CommunicationListener<Message> lsnr;
@@ -830,6 +1020,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Count of selectors to use in TCP server. */
     private int selectorsCnt = DFLT_SELECTORS_CNT;
 
+    /**
+     * Defines how many non-blocking {@code selector.selectNow()} should be made before
+     * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+     * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+     */
+    private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L);
+
     /** Address resolver. */
     private AddressResolver addrRslvr;
 
@@ -863,11 +1060,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     };
 
     /** Client connect futures. */
-    private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts =
+    private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts =
         GridConcurrentFactory.newMap();
 
     /** */
-    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
+
+    /** */
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();
+
+    /** */
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
 
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -976,6 +1179,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return locPortRange;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isUsePairedConnections() {
+        return usePairedConnections;
+    }
+
+    /**
+     * Set this to {@code true} if {@code TcpCommunicationSpi} should
+     * maintain connection for outgoing and incoming messages separately.
+     * In this case total number of connections between local and each remote node
+     * is {@link #getConnectionsPerNode()} * 2.
+     * <p>
+     * Set this to {@code false} if each connection of {@link #getConnectionsPerNode()}
+     * should be used for outgoing and incoming messages. In this case total number
+     * of connections between local and each remote node is {@link #getConnectionsPerNode()}.
+     * In this case load NIO selectors load
+     * balancing of {@link GridNioServer} will be disabled.
+     * <p>
+     * Default is {@code true}.
+     *
+     * @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise.
+     * @see #getConnectionsPerNode()
+     */
+    public void setUsePairedConnections(boolean usePairedConnections) {
+        this.usePairedConnections = usePairedConnections;
+    }
+
+    /**
+     * Sets number of connections to each remote node. if {@link #isUsePairedConnections()}
+     * is {@code true} then number of connections is doubled and half is used for incoming and
+     * half for outgoing messages.
+     *
+     * @param maxConnectionsPerNode Number of connections per node.
+     * @see #isUsePairedConnections()
+     */
+    public void setConnectionsPerNode(int maxConnectionsPerNode) {
+        this.connectionsPerNode = maxConnectionsPerNode;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getConnectionsPerNode() {
+        return connectionsPerNode;
+    }
+
     /**
      * Sets local port to accept shared memory connections.
      * <p>
@@ -1222,6 +1468,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return selectorsCnt;
     }
 
+    /** {@inheritDoc} */
+    @Override public long getSelectorSpins() {
+        return selectorSpins;
+    }
+
+    /**
+     * Defines how many non-blocking {@code selector.selectNow()} should be made before
+     * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+     * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+     *
+     * @param selectorSpins Selector thread busy-loop iterations.
+     */
+    public void setSelectorSpins(long selectorSpins) {
+        this.selectorSpins = selectorSpins;
+    }
+
     /**
      * Sets value for {@code TCP_NODELAY} socket option. Each
      * socket will be opened using provided value.
@@ -1396,7 +1658,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log != null) {
             StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
                 GridNioRecoveryDescriptor desc = entry.getValue();
 
                 sb.append("    [key=").append(entry.getKey())
@@ -1409,14 +1671,48 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(']').append(U.nl());
             }
 
-            sb.append("Communication SPI clients: ").append(U.nl());
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
+                GridNioRecoveryDescriptor desc = entry.getValue();
+
+                sb.append("    [key=").append(entry.getKey())
+                    .append(", msgsSent=").append(desc.sent())
+                    .append(", msgsAckedByRmt=").append(desc.acked())
+                    .append(", reserveCnt=").append(desc.reserveCount())
+                    .append(", connected=").append(desc.connected())
+                    .append(", reserved=").append(desc.reserved())
+                    .append(", descIdHash=").append(System.identityHashCode(desc))
+                    .append(']').append(U.nl());
+            }
+
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
+                GridNioRecoveryDescriptor desc = entry.getValue();
 
-            for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
-                sb.append("    [node=").append(entry.getKey())
-                    .append(", client=").append(entry.getValue())
+                sb.append("    [key=").append(entry.getKey())
+                    .append(", msgsRcvd=").append(desc.received())
+                    .append(", lastAcked=").append(desc.lastAcknowledged())
+                    .append(", reserveCnt=").append(desc.reserveCount())
+                    .append(", connected=").append(desc.connected())
+                    .append(", reserved=").append(desc.reserved())
+                    .append(", handshakeIdx=").append(desc.handshakeIndex())
+                    .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
             }
 
+            sb.append("Communication SPI clients: ").append(U.nl());
+
+            for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
+                UUID nodeId = entry.getKey();
+                GridCommunicationClient[] clients0 = entry.getValue();
+
+                for (GridCommunicationClient client : clients0) {
+                    if (client != null) {
+                        sb.append("    [node=").append(nodeId)
+                            .append(", client=").append(client)
+                            .append(']').append(U.nl());
+                    }
+                }
+            }
+
             U.warn(log, sb.toString());
         }
 
@@ -1426,6 +1722,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             nioSrvr.dumpStats();
     }
 
+    /** */
+    private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
+
+    /** */
+    private final AtomicInteger connIdx = new AtomicInteger();
+
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
         initFailureDetectionTimeout();
@@ -1439,6 +1741,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
         assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
+        assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
+        assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 1024");
 
         if (!failureDetectionTimeoutEnabled()) {
             assertParameter(reconCnt > 0, "reconnectCnt > 0");
@@ -1458,6 +1762,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
         }
 
+        if (connectionsPerNode > 1) {
+            connPlc = new ConnectionPolicy() {
+                @Override public int connectionIndex() {
+                    return (int)(U.safeAbs(Thread.currentThread().getId()) % connectionsPerNode);
+                }
+            };
+        }
+        else {
+            connPlc = new ConnectionPolicy() {
+                @Override public int connectionIndex() {
+                    return 0;
+                }
+            };
+        }
+
         try {
             locHost = U.resolveLocalHost(locAddr);
         }
@@ -1495,6 +1814,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort);
             res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null);
             res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+            res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections);
 
             return res;
         }
@@ -1524,6 +1844,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
             log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
+            log.debug(configInfo("connectionsPerNode", connectionsPerNode));
 
             if (failureDetectionTimeoutEnabled()) {
                 log.debug(configInfo("connTimeout", connTimeout));
@@ -1548,6 +1869,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 ", slowClientQueueLimit=" + slowClientQueueLimit + ']');
         }
 
+        if (msgQueueLimit == 0)
+            U.quietAndWarn(log, "Message queue limit is set to 0 which may lead to " +
+                "potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " +
+                "due to message queues growth on sender and reciever sides.");
+
         registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
 
         connectGate = new ConnectGateway();
@@ -1642,9 +1968,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey key = ses.meta(CONN_IDX_META);
 
-                        return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
+                        return key != null ? formatter.reader(key.nodeId(), msgFactory) : null;
                     }
                 };
 
@@ -1657,9 +1983,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey key = ses.meta(CONN_IDX_META);
 
-                        return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
+                        return key != null ? formatter.writer(key.nodeId()) : null;
                     }
                 };
 
@@ -1716,6 +2042,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .logger(log)
                         .selectorCount(selectorsCnt)
                         .gridName(gridName)
+                        .serverName("tcp-comm")
                         .tcpNoDelay(tcpNoDelay)
                         .directBuffer(directBuf)
                         .byteOrder(ByteOrder.nativeOrder())
@@ -1725,18 +2052,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .directMode(true)
                         .metricsListener(metricsLsnr)
                         .writeTimeout(sockWriteTimeout)
+                        .selectorSpins(selectorSpins)
                         .filters(filters)
                         .writerFactory(writerFactory)
                         .skipRecoveryPredicate(skipRecoveryPred)
                         .messageQueueSizeListener(queueSizeMonitor)
+                        .balancing(usePairedConnections) // Current balancing logic assumes separate in/out connections.
                         .build();
 
                 boundTcpPort = port;
 
                 // Ack Port the TCP server was bound to.
                 if (log.isInfoEnabled())
-                    log.info("Successfully bound to TCP port [port=" + boundTcpPort +
-                        ", locHost=" + locHost + ']');
+                    log.info("Successfully bound communication NIO server to TCP port " +
+                        "[port=" + boundTcpPort + ", locHost=" + locHost + ", selectorsCnt=" + selectorsCnt +
+                        ", selectorSpins=" + srvr.selectorSpins() + ']');
 
                 srvr.idleTimeout(idleConnTimeout);
 
@@ -1837,8 +2167,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         shmemWorkers.clear();
 
         // Force closing on stop (safety).
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
+        }
 
         // Clear resources.
         nioSrvr = null;
@@ -1863,8 +2197,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             connectGate.stopped();
 
         // Force closing.
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
+        }
 
         getSpiContext().deregisterPorts();
 
@@ -1875,8 +2213,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
         connectGate.disconnected(reconnectFut);
 
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
+        }
 
         IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
             "Failed to connect client node disconnected.");
@@ -1885,6 +2227,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             clientFut.onDone(err);
 
         recoveryDescs.clear();
+        inRecDescs.clear();
+        outRecDescs.clear();
     }
 
     /** {@inheritDoc} */
@@ -1898,16 +2242,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     void onNodeLeft(UUID nodeId) {
         assert nodeId != null;
 
-        GridCommunicationClient client = clients.get(nodeId);
-
-        if (client != null) {
-            if (log.isDebugEnabled())
-                log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
-                    ", client=" + client + ']');
+        GridCommunicationClient[] clients0 = clients.remove(nodeId);
 
-            client.forceClose();
+        if (clients0 != null) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
+                            ", client=" + client + ']');
 
-            clients.remove(nodeId, client);
+                    client.forceClose();
+                }
+            }
         }
     }
 
@@ -1982,11 +2328,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         else {
             GridCommunicationClient client = null;
 
+            int connIdx = useMultipleConnections(node) ? connPlc.connectionIndex() : 0;
+
             try {
                 boolean retry;
 
                 do {
-                    client = reserveClient(node);
+                    client = reserveClient(node, connIdx);
 
                     UUID nodeId = null;
 
@@ -2000,7 +2348,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (!retry)
                         sentMsgsCnt.increment();
                     else {
-                        clients.remove(node.id(), client);
+                        removeNodeClient(node.id(), client);
 
                         ClusterNode node0 = getSpiContext().node(node.id());
 
@@ -2017,26 +2365,94 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
             }
             finally {
-                if (client != null && clients.remove(node.id(), client))
+                if (client != null && removeNodeClient(node.id(), client))
                     client.forceClose();
             }
         }
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param rmvClient Client to remove.
+     * @return {@code True} if client was removed.
+     */
+    private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) {
+        for (;;) {
+            GridCommunicationClient[] curClients = clients.get(nodeId);
+
+            if (curClients == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient)
+                return false;
+
+            GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
+
+            newClients[rmvClient.connectionIndex()] = null;
+
+            if (clients.replace(nodeId, curClients, newClients))
+                return true;
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param connIdx Connection index.
+     * @param addClient Client to add.
+     */
+    private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
+        assert connectionsPerNode > 0 : connectionsPerNode;
+        assert connIdx == addClient.connectionIndex() : addClient;
+
+        if (connIdx >= connectionsPerNode) {
+            assert !usePairedConnections(node);
+
+            return;
+        }
+
+        for (;;) {
+            GridCommunicationClient[] curClients = clients.get(node.id());
+
+            assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() +
+                ", connIdx=" + connIdx +
+                ", client=" + addClient +
+                ", oldClient=" + curClients[connIdx] + ']';
+
+            GridCommunicationClient[] newClients;
+
+            if (curClients == null) {
+                newClients = new GridCommunicationClient[useMultipleConnections(node) ? connectionsPerNode : 1];
+                newClients[connIdx] = addClient;
+
+                if (clients.putIfAbsent(node.id(), newClients) == null)
+                    break;
+            }
+            else {
+                newClients = Arrays.copyOf(curClients, curClients.length);
+                newClients[connIdx] = addClient;
+
+                if (clients.replace(node.id(), curClients, newClients))
+                    break;
+            }
+        }
+    }
+
+    /**
      * Returns existing or just created client to node.
      *
      * @param node Node to which client should be open.
+     * @param connIdx Connection index.
      * @return The existing or just created client.
      * @throws IgniteCheckedException Thrown if any exception occurs.
      */
-    private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException {
+    private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
         assert node != null;
+        assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
 
         UUID nodeId = node.id();
 
         while (true) {
-            GridCommunicationClient client = clients.get(nodeId);
+            GridCommunicationClient[] curClients = clients.get(nodeId);
+
+            GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
+                curClients[connIdx] : null;
 
             if (client == null) {
                 if (stopping)
@@ -2045,25 +2461,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 // Do not allow concurrent connects.
                 GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
 
-                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut);
+                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+
+                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
 
                 if (oldFut == null) {
                     try {
-                        GridCommunicationClient client0 = clients.get(nodeId);
+                        GridCommunicationClient[] curClients0 = clients.get(nodeId);
+
+                        GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
+                            curClients0[connIdx] : null;
 
                         if (client0 == null) {
-                            client0 = createNioClient(node);
+                            client0 = createNioClient(node, connIdx);
 
                             if (client0 != null) {
-                                GridCommunicationClient old = clients.put(nodeId, client0);
-
-                                assert old == null : "Client already created " +
-                                    "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
+                                addNodeClient(node, connIdx, client0);
 
                                 if (client0 instanceof GridTcpNioCommunicationClient) {
                                     GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
 
-                                    if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) {
+                                    if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
                                         if (log.isDebugEnabled())
                                             log.debug("Session was closed after client creation, will retry " +
                                                 "[node=" + node + ", client=" + client0 + ']');
@@ -2085,7 +2503,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             throw (Error)e;
                     }
                     finally {
-                        clientFuts.remove(nodeId, fut);
+                        clientFuts.remove(connKey, fut);
                     }
                 }
                 else
@@ -2097,27 +2515,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     continue;
 
                 if (getSpiContext().node(nodeId) == null) {
-                    if (clients.remove(nodeId, client))
+                    if (removeNodeClient(nodeId, client))
                         client.forceClose();
 
                     throw new IgniteSpiException("Destination node is not in topology: " + node.id());
                 }
             }
 
+            assert connIdx == client.connectionIndex() : client;
+
             if (client.reserve())
                 return client;
             else
                 // Client has just been closed by idle worker. Help it and try again.
-                clients.remove(nodeId, client);
+                removeNodeClient(nodeId, client);
         }
     }
 
     /**
      * @param node Node to create client for.
+     * @param connIdx Connection index.
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException {
+    @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
+        throws IgniteCheckedException {
         assert node != null;
 
         Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2136,6 +2558,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             try {
                 GridCommunicationClient client = createShmemClient(
                     node,
+                    connIdx,
                     shmemPort);
 
                 if (log.isDebugEnabled())
@@ -2158,7 +2581,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         connectGate.enter();
 
         try {
-            GridCommunicationClient client = createTcpClient(node);
+            GridCommunicationClient client = createTcpClient(node, connIdx);
 
             if (log.isDebugEnabled())
                 log.debug("TCP client created: " + client);
@@ -2173,10 +2596,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      * @param node Node.
      * @param port Port.
+     * @param connIdx Connection index.
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node,
+    @Nullable private GridCommunicationClient createShmemClient(ClusterNode node,
+        int connIdx,
         Integer port) throws IgniteCheckedException {
         int attempt = 1;
 
@@ -2190,7 +2615,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             GridCommunicationClient client;
 
             try {
-                client = new GridShmemCommunicationClient(metricsLsnr,
+                client = new GridShmemCommunicationClient(
+                    connIdx,
+                    metricsLsnr,
                     port,
                     timeoutHelper.nextTimeoutChunk(connTimeout),
                     log,
@@ -2211,7 +2638,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null);
+                safeHandshake(client,
+                    null,
+                    node.id(),
+                    timeoutHelper.nextTimeoutChunk(connTimeout0),
+                    null,
+                    null);
             }
             catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -2270,10 +2702,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
         if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
-            UUID id = ses.meta(NODE_ID_META);
+            ConnectionKey id = ses.meta(CONN_IDX_META);
 
             if (id != null) {
-                ClusterNode node = getSpiContext().node(id);
+                ClusterNode node = getSpiContext().node(id.nodeId);
 
                 if (node != null && node.isClient()) {
                     String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -2283,11 +2715,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         ", clientNode=" + node +
                         ", slowClientQueueLimit=" + slowClientQueueLimit + ']';
 
-                    U.quietAndWarn(
-                        log,
-                        msg);
+                    U.quietAndWarn(log, msg);
 
-                    getSpiContext().failNode(id, msg);
+                    getSpiContext().failNode(id.nodeId(), msg);
                 }
             }
         }
@@ -2297,10 +2727,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * Establish TCP connection to remote node and returns client.
      *
      * @param node Remote node.
+     * @param connIdx Connection index.
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+    protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
         Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
         Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
         Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -2368,7 +2799,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "(node left topology): " + node);
                     }
 
-                    GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node);
+                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
+
+                    GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
 
                     if (!recoveryDesc.reserve()) {
                         U.closeQuiet(ch);
@@ -2395,11 +2828,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             sslMeta.sslEngine(sslEngine);
                         }
 
+                        Integer handshakeConnIdx = useMultipleConnections(node) ? connIdx : null;
+
                         rcvCnt = safeHandshake(ch,
                             recoveryDesc,
                             node.id(),
                             timeoutHelper.nextTimeoutChunk(connTimeout0),
-                            sslMeta);
+                            sslMeta,
+                            handshakeConnIdx);
 
                         if (rcvCnt == -1)
                             return null;
@@ -2410,7 +2846,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
 
                     try {
-                        meta.put(NODE_ID_META, node.id());
+                        meta.put(CONN_IDX_META, connKey);
 
                         if (recoveryDesc != null) {
                             recoveryDesc.onHandshake(rcvCnt);
@@ -2420,7 +2856,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         GridNioSession ses = nioSrvr.createSession(ch, meta).get();
 
-                        client = new GridTcpNioCommunicationClient(ses, log);
+                        client = new GridTcpNioCommunicationClient(connIdx, ses, log);
 
                         conn = true;
                     }
@@ -2564,6 +3000,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @param rmtNodeId Remote node.
      * @param timeout Timeout for handshake.
      * @param sslMeta Session meta.
+     * @param handshakeConnIdx Non null connection index if need send it in handshake.
      * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
      * @return Handshake response.
      */
@@ -2573,7 +3010,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
-        GridSslMeta sslMeta
+        GridSslMeta sslMeta,
+        @Nullable Integer handshakeConnIdx
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
 
@@ -2655,14 +3093,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
 
                     if (recovery != null) {
-                        HandshakeMessage msg = new HandshakeMessage(locNode.id(),
-                            recovery.incrementConnectCount(),
-                            recovery.received());
+                        HandshakeMessage msg;
+
+                        int msgSize = 33;
+
+                        if (handshakeConnIdx != null) {
+                            msg = new HandshakeMessage2(locNode.id(),
+                                recovery.incrementConnectCount(),
+                                recovery.received(),
+                                handshakeConnIdx);
+
+                            msgSize += 4;
+                        }
+                        else {
+                            msg = new HandshakeMessage(locNode.id(),
+                                recovery.incrementConnectCount(),
+                                recovery.received());
+                        }
 
                         if (log.isDebugEnabled())
                             log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
 
-                        buf = ByteBuffer.allocate(33);
+                        buf = ByteBuffer.allocate(msgSize);
 
                         buf.order(ByteOrder.nativeOrder());
 
@@ -2689,6 +3141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         else
                             ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
                     }
+
                     if (recovery != null) {
                         if (log.isDebugEnabled())
                             log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
@@ -2818,26 +3271,81 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         U.join(commWorker, log);
 
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param key Connection key.
+     * @return Recovery descriptor for outgoing connection.
+     */
+    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
+        if (usePairedConnections(node))
+            return recoveryDescriptor(outRecDescs, true, node, key);
+        else
+            return recoveryDescriptor(recoveryDescs, false, node, key);
     }
 
     /**
      * @param node Node.
-     * @return Recovery receive data for given node.
+     * @param key Connection key.
+     * @return Recovery descriptor for incoming connection.
+     */
+    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
+        if (usePairedConnections(node))
+            return recoveryDescriptor(inRecDescs, true, node, key);
+        else
+            return recoveryDescriptor(recoveryDescs, false, node, key);
+    }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if given node supports multiple connections per-node for communication.
+     */
+    private boolean useMultipleConnections(ClusterNode node) {
+        return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0;
+    }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if can use in/out connection pair for communication.
      */
-    private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
-        ClientKey id = new ClientKey(node.id(), node.order());
+    private boolean usePairedConnections(ClusterNode node) {
+        if (usePairedConnections) {
+            Boolean attr = node.attribute(createSpiAttributeName(ATTR_PAIRED_CONN));
+
+            return attr != null && attr;
+        }
+
+        return false;
+    }
 
-        GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
+    /**
+     * @param recoveryDescs Descriptors map.
+     * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
+     * @param node Node.
+     * @param key Connection key.
+     * @return Recovery receive data for given node.
+     */
+    private GridNioRecoveryDescriptor recoveryDescriptor(
+        ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs,
+        boolean pairedConnections,
+        ClusterNode node,
+        ConnectionKey key) {
+        GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);
 
         if (recovery == null) {
             int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
 
-            int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
+            int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 128);
 
-            GridNioRecoveryDescriptor old =
-                recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
+            GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key,
+                recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, log));
 
             if (old != null)
                 recovery = old;
@@ -2879,54 +3387,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return S.toString(TcpCommunicationSpi.class, this);
     }
 
-    /**
-     *
-     */
-    private static class ClientKey {
-        /** */
-        private UUID nodeId;
-
-        /** */
-        private long order;
-
-        /**
-         * @param nodeId Node ID.
-         * @param order Node order.
-         */
-        private ClientKey(UUID nodeId, long order) {
-            this.nodeId = nodeId;
-            this.order = order;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-
-            if (obj == null || getClass() != obj.getClass())
-                return false;
-
-            ClientKey other = (ClientKey)obj;
-
-            return order == other.order && nodeId.equals(other.nodeId);
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + (int)(order ^ (order >>> 32));
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ClientKey.class, this);
-        }
-    }
-
     /** Internal exception class for proper timeout handling. */
     private static class HandshakeTimeoutException extends IgniteCheckedException {
         /** */
@@ -3026,9 +3486,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
-                        return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
+                        return connKey != null ? formatter.writer(connKey.nodeId()) : null;
                     }
                 };
 
@@ -3042,9 +3502,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
-                        return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
+                        return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null;
                     }
                 };
 
@@ -3125,62 +3585,108 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         private void processIdle() {
             cleanupRecovery();
 
-            for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
+            for (Map.Entry<UUID, GridCommunicationClient[]> e : clients.entrySet()) {
                 UUID nodeId = e.getKey();
 
-                GridCommunicationClient client = e.getValue();
+                for (GridCommunicationClient client : e.getValue()) {
+                    if (client == null)
+                        continue;
 
-                ClusterNode node = getSpiContext().node(nodeId);
+                    ClusterNode node = getSpiContext().node(nodeId);
 
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Forcing close of non-existent node connection: " + nodeId);
+                    if (node == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Forcing close of non-existent node connection: " + nodeId);
 
-                    client.forceClose();
+                        client.forceClose();
 
-                    clients.remove(nodeId, client);
+                        removeNodeClient(nodeId, client);
 
-                    continue;
-                }
+                        continue;
+                    }
 
-                GridNioRecoveryDescriptor recovery = null;
+                    GridNioRecoveryDescriptor recovery = null;
 
-                if (client instanceof GridTcpNioCommunicationClient) {
-                    recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+                    if (!usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient) {
+                        recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
 
-                    if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
-                        RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+                        if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+                            RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
 
-                        if (log.isDebugEnabled())
-                            log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
-                                ", rcvCnt=" + msg.received() + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+                                    ", rcvCnt=" + msg.received() + ']');
 
-                        nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+                            try {
+                                nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
 
-                        recovery.lastAcknowledged(msg.received());
+                                recovery.lastAcknowledged(msg.received());
+                            }
+                            catch (IgniteCheckedException err) {
+                                U.error(log, "Failed to send message: " + err, err);
+                            }
 
-                        continue;
+                            continue;
+                        }
                     }
-                }
 
-                long idleTime = client.getIdleTime();
+                    long idleTime = client.getIdleTime();
+
+                    if (idleTime >= idleConnTimeout) {
+                        if (recovery == null && usePairedConnections(node))
+                            recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
+
+                        if (recovery != null &&
+                            recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+                            !recovery.messagesRequests().isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Node connection is idle, but there are unacknowledged messages, " +
+                                    "will wait: " + nodeId);
+
+                            continue;
+                        }
 
-                if (idleTime >= idleConnTimeout) {
-                    if (recovery != null &&
-                        recovery.nodeAlive(getSpiContext().node(nodeId)) &&
-                        !recovery.messagesFutures().isEmpty()) {
                         if (log.isDebugEnabled())
-                            log.debug("Node connection is idle, but there are unacknowledged messages, " +
-                                "will wait: " + nodeId);
+                            log.debug("Closing idle node connection: " + nodeId);
 
-                        continue;
+                        if (client.close() || client.closed())
+                            removeNodeClient(nodeId, client);
                     }
+                }
+            }
 
-                    if (log.isDebugEnabled())
-                        log.debug("Closing idle node connection: " + nodeId);
+            for (GridNioSession ses : nioSrvr.sessions()) {
+                GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
+
+                if (recovery != null && usePairedConnections(recovery.node())) {
+                    assert ses.accepted() : ses;
+
+                    sendAckOnTimeout(recovery, ses);
+                }
+            }
+        }
+
+        /**
+         * @param recovery Recovery descriptor.
+         * @param ses Session.
+         */
+        private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, GridNioSession ses) {
+            if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+                RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Send recovery acknowledgement on timeout [rmtNode=" + recovery.node().id() +
+                        ", rcvCnt=" + msg.received() +
+                        ", lastAcked=" + recovery.lastAcknowledged() + ']');
+                }
+
+                try {
+                    nioSrvr.sendSystem(ses, msg);
 
-                    if (client.close() || client.closed())
-                        clients.remove(nodeId, client);
+                    recovery.lastAcknowledged(msg.received());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
                 }
             }
         }
@@ -3189,15 +3695,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          *
          */
         private void cleanupRecovery() {
-            Set<ClientKey> left = null;
+            cleanupRecovery(recoveryDescs);
+            cleanupRecovery(inRecDescs);
+            cleanupRecovery(outRecDescs);
+        }
+
+        /**
+         * @param recoveryDescs Recovery descriptors to cleanup.
+         */
+        private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) {
+            Set<ConnectionKey> left = null;
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
                 if (left != null && left.contains(e.getKey()))
                     continue;
 
-                GridNioRecoveryDescriptor recoverySnd = e.getValue();
+                GridNioRecoveryDescriptor recoveryDesc = e.getValue();
 
-                if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+                if (!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) {
                     if (left == null)
                         left = new HashSet<>();
 
@@ -3208,11 +3723,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             if (left != null) {
                 assert !left.isEmpty();
 
-                for (ClientKey id : left) {
-                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id);
+                for (ConnectionKey id : left) {
+                    GridNioRecoveryDescriptor recoveryDesc = recoveryDescs.get(id);
 
-                    if (recoverySnd != null && recoverySnd.onNodeLeft())
-                        recoveryDescs.remove(id);
+                    if (recoveryDesc != null && recoveryDesc.onNodeLeft())
+                        recoveryDescs.remove(id, recoveryDesc);
                 }
             }
         }
@@ -3221,45 +3736,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          * @param sesInfo Disconnected session information.
          */
         private void processDisconnect(DisconnectedSessionInfo sesInfo) {
-            if (sesInfo.reconnect) {
-                GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
-
-                ClusterNode node = recoveryDesc.node();
+            GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
 
-                if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
-                    return;
+            ClusterNode node = recoveryDesc.node();
 
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+            if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+                return;
 
-                    GridCommunicationClient client = reserveClient(node);
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
 
-                    client.release();
-                }
-                catch (IgniteCheckedException | IgniteException e) {
-                    try {
-                        if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
-                            if (log.isDebugEnabled())
-                                log.debug("Recovery reconnect failed, will retry " +
-                                    "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+                GridCommunicationClient client = reserveClient(node, sesInfo.connIdx);
 
-                            addProcessDisconnectRequest(sesInfo);
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Recovery reconnect failed, " +
-                                    "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+                client.release();
+            }
+            catch (IgniteCheckedException | IgniteException e) {
+                try {
+                    if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
+                        if (log.isDebugEnabled())
+                            log.debug("Recovery reconnect failed, will retry " +
+                                "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
 
-                            onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
-                                e);
-                        }
+                        addProcessDisconnectRequest(sesInfo);
                     }
-                    catch (IgniteClientDisconnectedException e0) {
+                    else {
                         if (log.isDebugEnabled())
-                            log.debug("Failed to ping node, client disconnected.");
+                            log.debug("Recovery reconnect failed, " +
+                                "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                        onE

<TRUNCATED>