You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/12 09:39:58 UTC

[3/7] incubator-ignite git commit: # ignite-21

# ignite-21


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dae4b942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dae4b942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dae4b942

Branch: refs/heads/master
Commit: dae4b9428cfe04fb2d0f78020e7f0ea29eb37c04
Parents: 20872dc
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 10 10:41:56 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 10 13:22:50 2014 +0300

----------------------------------------------------------------------
 .../rest/protocols/tcp/GridMockNioSession.java  |   11 +
 .../communication/tcp/TcpCommunicationSpi.java  | 1151 +++++++++++++++---
 .../tcp/TcpCommunicationSpiMBean.java           |   52 +-
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   |    3 +-
 .../GridTcpCommunicationMessageAdapter.java     |    7 +
 .../GridTcpCommunicationMessageFactory.java     |   10 +-
 .../grid/util/nio/GridCommunicationClient.java  |    3 +-
 .../grid/util/nio/GridNioFinishedFuture.java    |    5 +
 .../gridgain/grid/util/nio/GridNioFuture.java   |   15 +-
 .../grid/util/nio/GridNioFutureImpl.java        |    7 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |  356 ++++++
 .../gridgain/grid/util/nio/GridNioServer.java   |  167 ++-
 .../gridgain/grid/util/nio/GridNioSession.java  |   10 +
 .../grid/util/nio/GridNioSessionImpl.java       |   10 +
 .../util/nio/GridSelectorNioSessionImpl.java    |   97 ++
 .../util/nio/GridShmemCommunicationClient.java  |    4 +-
 .../util/nio/GridTcpCommunicationClient.java    |    4 +-
 .../util/nio/GridTcpNioCommunicationClient.java |   39 +-
 .../GridAbstractCommunicationSelfTest.java      |   85 +-
 .../spi/communication/GridTestMessage.java      |   15 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   26 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |  398 ++++++
 .../GridTcpCommunicationSpiConfigSelfTest.java  |    4 +
 ...cpCommunicationSpiMultithreadedSelfTest.java |   77 +-
 ...pCommunicationSpiMultithreadedShmemTest.java |    2 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |  426 +++++++
 ...GridTcpCommunicationSpiRecoverySelfTest.java |  713 +++++++++++
 .../GridTcpCommunicationSpiShmemSelfTest.java   |    2 +-
 ...cpCommunicationSpiTcpNoDelayOffSelfTest.java |   20 +
 .../managers/GridManagerStopSelfTest.java       |    2 +
 .../grid/spi/GridSpiStartStopAbstractTest.java  |    2 +
 .../nio/impl/GridNioFilterChainSelfTest.java    |   10 +
 .../gridgain/testframework/GridTestNode.java    |   12 +-
 .../GridSpiCommunicationSelfTestSuite.java      |    8 +-
 34 files changed, 3434 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java b/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java
index 7071ed5..bf82f22 100644
--- a/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java
+++ b/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java
@@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.processors.rest.protocols.tcp;
 
 import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.nio.*;
+import org.jetbrains.annotations.*;
 
 import java.net.*;
 
@@ -132,4 +133,14 @@ public class GridMockNioSession extends GridMetadataAwareAdapter implements Grid
     @Override public boolean readsPaused() {
         return false;
     }
+
+    /** {@inheritDoc} */
+    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a80421e..1911b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -24,6 +24,7 @@ import org.gridgain.grid.kernal.managers.eventstorage.*;
 import org.apache.ignite.spi.communication.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.future.*;
 import org.gridgain.grid.util.ipc.*;
 import org.gridgain.grid.util.ipc.shmem.*;
 import org.gridgain.grid.util.lang.*;
@@ -51,7 +52,7 @@ import static org.apache.ignite.events.IgniteEventType.*;
  * TCP/IP protocol and Java NIO to communicate with other nodes.
  * <p>
  * To enable communication with other nodes, this SPI adds {@link #ATTR_ADDRS}
- * and {@link #ATTR_PORT} local node attributes (see {@link org.apache.ignite.cluster.ClusterNode#attributes()}.
+ * and {@link #ATTR_PORT} local node attributes (see {@link ClusterNode#attributes()}.
  * <p>
  * At startup, this SPI tries to start listening to local port specified by
  * {@link #setLocalPort(int)} method. If local port is occupied, then SPI will
@@ -84,9 +85,7 @@ import static org.apache.ignite.events.IgniteEventType.*;
  * <li>Direct or heap buffer allocation for sending (see {@link #setDirectSendBuffer(boolean)})</li>
  * <li>Count of selectors and selector threads for NIO server (see {@link #setSelectorsCount(int)})</li>
  * <li>{@code TCP_NODELAY} socket option for sockets (see {@link #setTcpNoDelay(boolean)})</li>
- * <li>Async message sending (see {@link #setAsyncSend(boolean)})</li>
  * <li>Message queue limit (see {@link #setMessageQueueLimit(int)})</li>
- * <li>Dual socket connection (see {@link #setDualSocketConnection(boolean)})</li>
  * <li>Minimum buffered message count (see {@link #setMinimumBufferedMessageCount(int)})</li>
  * <li>Buffer size ratio (see {@link #setBufferSizeRatio(double)})</li>
  * <li>Connect timeout (see {@link #setConnectTimeout(long)})</li>
@@ -95,6 +94,9 @@ import static org.apache.ignite.events.IgniteEventType.*;
  * <li>Local port to accept shared memory connections (see {@link #setSharedMemoryPort(int)})</li>
  * <li>Socket receive buffer size (see {@link #setSocketReceiveBuffer(int)})</li>
  * <li>Socket send buffer size (see {@link #setSocketSendBuffer(int)})</li>
+ * <li>Socket write timeout (see {@link #setSocketWriteTimeout(long)})</li>
+ * <li>Number of received messages after which acknowledgment is sent (see {@link #setAckSendThreshold(int)})</li>
+ * <li>Maximum number of unacknowledged messages (see {@link #setUnacknowledgedMessagesBufferSize(int)})</li>
  * </ul>
  * <h2 class="header">Java Example</h2>
  * GridTcpCommunicationSpi is used by default and should be explicitly configured
@@ -131,7 +133,7 @@ import static org.apache.ignite.events.IgniteEventType.*;
  * <img src="http://www.gridgain.com/images/spring-small.png">
  * <br>
  * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
- * @see org.apache.ignite.spi.communication.CommunicationSpi
+ * @see CommunicationSpi
  */
 @IgniteSpiMultipleInstancesSupport(true)
 @IgniteSpiConsistencyChecked(optional = false)
@@ -187,9 +189,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default message queue limit per connection (for incoming and outgoing . */
     public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT;
 
-    /** Default value for dualSocketConnection flag. */
-    public static final boolean DFLT_DUAL_SOCKET_CONNECTION = false;
-
     /**
      * Default count of selectors for TCP server equals to
      * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
@@ -211,6 +210,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
     public static final boolean DFLT_TCP_NODELAY = true;
 
+    /** Default received messages threshold for sending ack. */
+    public static final int DFLT_ACK_SND_THRESHOLD = 512;
+
+    /** Default socket write timeout. */
+    public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT;
+
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {
         @Override public void run() {
@@ -221,9 +226,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Node ID message type. */
     public static final byte NODE_ID_MSG_TYPE = -1;
 
+    /** */
+    public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2;
+
+    /** */
+    public static final byte HANDSHAKE_MSG_TYPE = -3;
+
     /** Server listener. */
     private final GridNioServerListener<GridTcpCommunicationMessageAdapter> srvLsnr =
         new GridNioServerListenerAdapter<GridTcpCommunicationMessageAdapter>() {
+            @Override public void onSessionWriteTimeout(GridNioSession ses) {
+                LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " +
+                    "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
+                    ", writeTimeout=" + sockWriteTimeout + ']');
+
+                if (log.isDebugEnabled())
+                    log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() +
+                        ", writeTimeout=" + sockWriteTimeout + ']');
+
+                ses.close();
+            }
+
             @Override public void onConnected(GridNioSession ses) {
                 if (ses.accepted()) {
                     if (log.isDebugEnabled())
@@ -231,8 +254,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     ses.send(nodeIdMsg);
                 }
-                else
-                    assert asyncSnd;
             }
 
             @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
@@ -243,9 +264,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     if (rmv instanceof GridTcpNioCommunicationClient &&
                         ((GridTcpNioCommunicationClient)rmv).session() == ses &&
-                        clients.remove(id, rmv))
+                        clients.remove(id, rmv)) {
                         rmv.forceClose();
 
+                        if (!stopping) {
+                            GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
+
+                            if (recoveryData != null) {
+                                if (recoveryData.nodeAlive(getSpiContext().node(id))) {
+                                    if (!recoveryData.messagesFutures().isEmpty()) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Session was closed but there are unacknowledged messages, " +
+                                                "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
+
+                                        recoveryWorker.addReconnectRequest(recoveryData);
+                                    }
+                                }
+                                else
+                                    recoveryData.onNodeLeft();
+                            }
+                        }
+                    }
+
                     CommunicationListener<GridTcpCommunicationMessageAdapter> lsnr0 = lsnr;
 
                     if (lsnr0 != null)
@@ -259,20 +299,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (sndId == null) {
                     assert ses.accepted();
 
-                    assert msg instanceof NodeIdMessage;
+                    if (msg instanceof NodeIdMessage)
+                        sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+                    else {
+                        assert msg instanceof HandshakeMessage : msg;
 
-                    sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+                        sndId = ((HandshakeMessage)msg).nodeId();
+                    }
 
                     if (log.isDebugEnabled())
                         log.debug("Remote node ID received: " + sndId);
 
-                    UUID old = ses.addMeta(NODE_ID_META, sndId);
+                    final UUID old = ses.addMeta(NODE_ID_META, sndId);
 
                     assert old == null;
 
-                    IgniteProductVersion locVer = getSpiContext().localNode().version();
-
-                    ClusterNode rmtNode = getSpiContext().node(sndId);
+                    final ClusterNode rmtNode = getSpiContext().node(sndId);
 
                     if (rmtNode == null) {
                         ses.close();
@@ -280,25 +322,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         return;
                     }
 
+                    ClusterNode locNode = getSpiContext().localNode();
+
+                    IgniteProductVersion locVer = locNode.version();
+
                     IgniteProductVersion rmtVer = rmtNode.version();
 
                     if (!locVer.equals(rmtVer))
                         ses.addMeta(GridNioServer.DIFF_VER_NODE_ID_META_KEY, sndId);
 
-                    if (asyncSnd && ses.remoteAddress() != null && !dualSockConn) {
-                        Object sync = locks.tryLock(sndId);
+                    if (ses.remoteAddress() == null)
+                        return;
+
+                    GridCommunicationClient oldClient = clients.get(sndId);
+
+                    boolean hasShmemClient = false;
+
+                    if (oldClient != null) {
+                        if (oldClient instanceof GridTcpNioCommunicationClient) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received incoming connection when already connected " +
+                                    "to this node, rejecting [locNode=" + locNode.id() +
+                                    ", rmtNode=" + sndId + ']');
+
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+
+                            return;
+                        }
+                        else {
+                            assert oldClient instanceof GridShmemCommunicationClient;
+
+                            hasShmemClient = true;
+                        }
+                    }
+
+                    GridFutureAdapterEx<GridCommunicationClient> fut = new GridFutureAdapterEx<>();
+
+                    GridFutureAdapterEx<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
+
+                    assert msg instanceof HandshakeMessage : msg;
+
+                    HandshakeMessage msg0 = (HandshakeMessage)msg;
+
+                    final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
+
+                    if (oldFut == null) {
+                        oldClient = clients.get(sndId);
+
+                        if (oldClient != null) {
+                            if (oldClient instanceof GridTcpNioCommunicationClient) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Received incoming connection when already connected " +
+                                        "to this node, rejecting [locNode=" + locNode.id() +
+                                        ", rmtNode=" + sndId + ']');
+
+                                ses.send(new RecoveryLastReceivedMessage(-1));
+
+                                return;
+                            }
+                            else {
+                                assert oldClient instanceof GridShmemCommunicationClient;
+
+                                hasShmemClient = true;
+                            }
+                        }
+
+                        boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
 
-                        if (sync != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received incoming connection from remote node " +
+                                "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
+
+                        if (reserved) {
                             try {
-                                if (clients.get(sndId) == null) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Will reuse session for node: " + sndId);
+                                GridTcpNioCommunicationClient client =
+                                    connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
 
-                                    clients.put(sndId, new GridTcpNioCommunicationClient(ses));
-                                }
+                                fut.onDone(client);
                             }
                             finally {
-                                locks.unlock(sndId, sync);
+                                clientFuts.remove(rmtNode.id(), fut);
+                            }
+                        }
+                    }
+                    else {
+                        if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received incoming connection from remote node while " +
+                                    "connecting to this node, rejecting [locNode=" + locNode.id() +
+                                    ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
+                                    ", rmtNodeOrder=" + rmtNode.order() + ']');
+                            }
+
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+                        }
+                        else {
+                            boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+                                new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+
+                            if (reserved) {
+                                GridTcpNioCommunicationClient client =
+                                    connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+
+                                fut.onDone(client);
                             }
                         }
                     }
@@ -306,6 +433,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 else {
                     rcvdMsgsCnt.increment();
 
+                    GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+
+                    if (recovery != null) {
+                        if (msg instanceof RecoveryLastReceivedMessage) {
+                            RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
+
+                            if (log.isDebugEnabled())
+                                log.debug("Received recovery acknowledgement [rmtNode=" + sndId +
+                                    ", rcvCnt=" + msg0.received() + ']');
+
+                            recovery.ackReceived(msg0.received());
+
+                            return;
+                        }
+                        else {
+                            long rcvCnt = recovery.onReceived();
+
+                            if (rcvCnt % ackSndThreshold == 0) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Send recovery acknowledgement [rmtNode=" + sndId +
+                                        ", rcvCnt=" + rcvCnt + ']');
+
+                                nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
+
+                                recovery.lastAcknowledged(rcvCnt);
+                            }
+                        }
+                    }
+
                     IgniteRunnable c;
 
                     if (msgQueueLimit > 0) {
@@ -328,6 +484,135 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     notifyListener(sndId, msg, c);
                 }
             }
+
+            /**
+             * @param recovery Recovery descriptor.
+             * @param ses Session.
+             * @param node Node.
+             * @param rcvCnt Number of received messages..
+             * @param sndRes If {@code true} sends response for recovery handshake.
+             * @param createClient If {@code true} creates NIO communication client.
+             * @return Client.
+             */
+            private GridTcpNioCommunicationClient connected(
+                GridNioRecoveryDescriptor recovery,
+                GridNioSession ses,
+                ClusterNode node,
+                long rcvCnt,
+                boolean sndRes,
+                boolean createClient) {
+                recovery.onHandshake(rcvCnt);
+
+                ses.recoveryDescriptor(recovery);
+
+                nioSrvr.resend(ses);
+
+                if (sndRes)
+                    nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount()));
+
+                recovery.connected();
+
+                GridTcpNioCommunicationClient client = null;
+
+                if (createClient) {
+                    client = new GridTcpNioCommunicationClient(ses, log);
+
+                    GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+
+                    assert oldClient == null;
+                }
+
+                return client;
+            }
+
+            /**
+             *
+             */
+            @SuppressWarnings("PackageVisibleInnerClass")
+            class ConnectClosure implements IgniteInClosure<Boolean> {
+                /** */
+                private static final long serialVersionUID = 0L;
+
+                /** */
+                private final GridNioSession ses;
+
+                /** */
+                private final GridNioRecoveryDescriptor recoveryDesc;
+
+                /** */
+                private final ClusterNode rmtNode;
+
+                /** */
+                private final HandshakeMessage msg;
+
+                /** */
+                private final GridFutureAdapterEx<GridCommunicationClient> fut;
+
+                /** */
+                private final boolean createClient;
+
+                /**
+                 * @param ses Incoming session.
+                 * @param recoveryDesc Recovery descriptor.
+                 * @param rmtNode Remote node.
+                 * @param msg Handshake message.
+                 * @param createClient If {@code true} creates NIO communication client..
+                 * @param fut Connect future.
+                 */
+                ConnectClosure(GridNioSession ses,
+                    GridNioRecoveryDescriptor recoveryDesc,
+                    ClusterNode rmtNode,
+                    HandshakeMessage msg,
+                    boolean createClient,
+                    GridFutureAdapterEx<GridCommunicationClient> fut) {
+                    this.ses = ses;
+                    this.recoveryDesc = recoveryDesc;
+                    this.rmtNode = rmtNode;
+                    this.msg = msg;
+                    this.createClient = createClient;
+                    this.fut = fut;
+                }
+
+                /** {@inheritDoc} */
+                @Override public void apply(Boolean success) {
+                    if (success) {
+                        IgniteInClosure<GridNioFuture<?>> lsnr = new IgniteInClosure<GridNioFuture<?>>() {
+                            @Override public void apply(GridNioFuture<?> msgFut) {
+                                try {
+                                    msgFut.get();
+
+                                    GridTcpNioCommunicationClient client =
+                                        connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
+
+                                    fut.onDone(client);
+                                }
+                                catch (GridException | IOException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to send recovery handshake " +
+                                            "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+                                    recoveryDesc.release();
+
+                                    fut.onDone();
+                                }
+                                finally {
+                                    clientFuts.remove(rmtNode.id(), fut);
+                                }
+                            }
+                        };
+
+                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr);
+                    }
+                    else {
+                        try {
+                            fut.onDone();
+                        }
+                        finally {
+                            clientFuts.remove(rmtNode.id(), fut);
+                        }
+                    }
+                }
+            }
         };
 
     /** Logger. */
@@ -402,9 +687,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Buffer size ratio. */
     private double bufSizeRatio = IgniteSystemProperties.getDouble(GG_COMMUNICATION_BUF_RESIZE_RATIO, 0.8);
 
-    /** Dual socket connection flag. */
-    private boolean dualSockConn = DFLT_DUAL_SOCKET_CONNECTION;
-
     /** NIO server. */
     private GridNioServer<GridTcpCommunicationMessageAdapter> nioSrvr;
 
@@ -414,8 +696,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** {@code TCP_NODELAY} option value for created sockets. */
     private boolean tcpNoDelay = DFLT_TCP_NODELAY;
 
-    /** Use async client flag. */
-    private boolean asyncSnd = true;
+    /** Number of received messages after which acknowledgment is sent. */
+    private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;
+
+    /** Maximum number of unacknowledged messages. */
+    private int unackedMsgsBufSize;
+
+    /** Socket write timeout. */
+    private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
 
     /** Shared memory accept worker. */
     private ShmemAcceptWorker shmemAcceptWorker;
@@ -429,6 +717,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Socket timeout worker. */
     private SocketTimeoutWorker sockTimeoutWorker;
 
+    /** Recovery worker. */
+    private RecoveryWorker recoveryWorker;
+
     /** Shared memory workers. */
     private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
 
@@ -468,6 +759,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Context initialization latch. */
     private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
 
+    /** Stopping flag. */
+    private volatile boolean stopping;
+
     /** metrics listener. */
     private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() {
         @Override public void onBytesSent(int bytesCnt) {
@@ -479,8 +773,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
     };
 
-    /** Locks. */
-    private final GridKeyLock locks = new GridKeyLock();
+    /** Client connect futures. */
+    private final ConcurrentMap<UUID, GridFutureAdapterEx<GridCommunicationClient>> clientFuts =
+        GridConcurrentFactory.newMap();
+
+    /** */
+    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
 
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -696,6 +994,58 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return idleConnTimeout;
     }
 
+    /** {@inheritDoc} */
+    @Override public long getSocketWriteTimeout() {
+        return sockWriteTimeout;
+    }
+
+    /**
+     * Sets socket write timeout for TCP connection. If message can not be written to
+     * socket within this time then connection is closed and reconnect is attempted.
+     * <p>
+     * Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}.
+     *
+     * @param sockWriteTimeout Socket write timeout for TCP connection.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setSocketWriteTimeout(long sockWriteTimeout) {
+        this.sockWriteTimeout = sockWriteTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getAckSendThreshold() {
+        return ackSndThreshold;
+    }
+
+    /**
+     * Sets number of received messages per connection to node after which acknowledgment message is sent.
+     * <p>
+     * Default to {@link #DFLT_ACK_SND_THRESHOLD}.
+     *
+     * @param ackSndThreshold Number of received messages after which acknowledgment is sent.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setAckSendThreshold(int ackSndThreshold) {
+        this.ackSndThreshold = ackSndThreshold;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getUnacknowledgedMessagesBufferSize() {
+        return unackedMsgsBufSize;
+    }
+
+    /**
+     * Sets maximum number of stored unacknowledged messages per connection to node.
+     * If number of unacknowledged messages exceeds this number then connection to node is
+     * closed and reconnect is attempted.
+     *
+     * @param unackedMsgsBufSize Maximum number of unacknowledged messages.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
+        this.unackedMsgsBufSize = unackedMsgsBufSize;
+    }
+
     /**
      * Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
      * <p>
@@ -838,24 +1188,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return selectorsCnt;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean isAsyncSend() {
-        return asyncSnd;
-    }
-
-    /**
-     * Sets flag defining whether asynchronous (NIO) or synchronous (blocking) IO
-     * should be used to send messages.
-     * <p>
-     * If not provided, default value is {@code true}.
-     *
-     * @param asyncSnd {@code True} if asynchronous IO should be used to send messages.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setAsyncSend(boolean asyncSnd) {
-        this.asyncSnd = asyncSnd;
-    }
-
     /**
      * Sets value for {@code TCP_NODELAY} socket option. Each
      * socket will be opened using provided value.
@@ -915,33 +1247,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
-     * Sets flag indicating whether dual-socket connection between nodes should be enforced. If set to
-     * {@code true}, two separate connections will be established between communicating nodes: one for outgoing
-     * messages, and one for incoming. When set to {@code false}, single {@code TCP} connection will be used
-     * for both directions.
-     * <p>
-     * This flag is useful on some operating systems, when {@code TCP_NODELAY} flag is disabled and
-     * messages take too long to get delivered.
-     * <p>
-     * If not provided, default is {@code false}.
-     *
-     * @param dualSockConn Whether dual-socket connection should be enforced.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setDualSocketConnection(boolean dualSockConn) {
-        this.dualSockConn = dualSockConn;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isDualSocketConnection() {
-        return dualSockConn;
-    }
-
-    /**
      * Sets message queue limit for incoming and outgoing messages.
      * <p>
-     * This parameter only used when {@link #isAsyncSend()} set to {@code true}.
-     * <p>
      * When set to positive number send queue is limited to the configured value.
      * {@code 0} disables the size limitations.
      * <p>
@@ -963,7 +1270,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * Sets the minimum number of messages for this SPI, that are buffered
      * prior to sending.
      * <p>
-     * Defaults to either {@code 512} or {@link org.apache.ignite.IgniteSystemProperties#GG_MIN_BUFFERED_COMMUNICATION_MSG_CNT}
+     * Defaults to either {@code 512} or {@link IgniteSystemProperties#GG_MIN_BUFFERED_COMMUNICATION_MSG_CNT}
      * system property (if specified).
      *
      * @param minBufferedMsgCnt Minimum buffered message count.
@@ -982,7 +1289,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * Sets the buffer size ratio for this SPI. As messages are sent,
      * the buffer size is adjusted using this ratio.
      * <p>
-     * Defaults to either {@code 0.8} or {@link org.apache.ignite.IgniteSystemProperties#GG_COMMUNICATION_BUF_RESIZE_RATIO}
+     * Defaults to either {@code 0.8} or {@link IgniteSystemProperties#GG_COMMUNICATION_BUF_RESIZE_RATIO}
      * system property (if specified).
      *
      * @param bufSizeRatio Buffer size ratio.
@@ -1064,6 +1371,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(bufSizeRatio > 0 && bufSizeRatio < 1, "bufSizeRatio > 0 && bufSizeRatio < 1");
         assertParameter(connTimeout >= 0, "connTimeout >= 0");
         assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+        assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
+        assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
+        assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
+
+        if (unackedMsgsBufSize > 0) {
+            assertParameter(unackedMsgsBufSize >= msgQueueLimit * 5,
+                "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
+
+            assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
+                "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
+        }
 
         try {
             locHost = U.resolveLocalHost(locAddr);
@@ -1125,18 +1443,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("connBufSize", connBufSize));
             log.debug(configInfo("connBufFlushFreq", connBufFlushFreq));
             log.debug(configInfo("selectorsCnt", selectorsCnt));
-            log.debug(configInfo("asyncSend", asyncSnd));
             log.debug(configInfo("tcpNoDelay", tcpNoDelay));
             log.debug(configInfo("sockSndBuf", sockSndBuf));
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
             log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
-            log.debug(configInfo("dualSockConn", dualSockConn));
             log.debug(configInfo("minBufferedMsgCnt", minBufferedMsgCnt));
             log.debug(configInfo("bufSizeRatio", bufSizeRatio));
             log.debug(configInfo("connTimeout", connTimeout));
             log.debug(configInfo("maxConnTimeout", maxConnTimeout));
             log.debug(configInfo("reconCnt", reconCnt));
+            log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
+            log.debug(configInfo("ackSndThreshold", ackSndThreshold));
+            log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
         }
 
         if (connBufSize > 8192)
@@ -1162,6 +1481,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         idleClientWorker.start();
 
+        recoveryWorker = new RecoveryWorker();
+
+        recoveryWorker.start();
+
         if (connBufSize > 0) {
             clientFlushWorker = new ClientFlushWorker();
 
@@ -1242,6 +1565,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .directMode(true)
                         .metricsListener(metricsLsnr)
                         .messageWriter(msgWriter)
+                        .writeTimeout(sockWriteTimeout)
                         .filters(new GridNioCodecFilter(new GridDirectParser(msgReader, this), log, true),
                             new GridConnectionBytesVerifyFilter(log))
                         .build();
@@ -1321,6 +1645,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
+        assert stopping;
+
         unregisterMBean();
 
         // Stop TCP server.
@@ -1333,10 +1659,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         U.interrupt(idleClientWorker);
         U.interrupt(clientFlushWorker);
         U.interrupt(sockTimeoutWorker);
+        U.interrupt(recoveryWorker);
 
         U.join(idleClientWorker, log);
         U.join(clientFlushWorker, log);
         U.join(sockTimeoutWorker, log);
+        U.join(recoveryWorker, log);
 
         U.cancel(shmemWorkers);
         U.join(shmemWorkers, log);
@@ -1360,6 +1688,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override protected void onContextDestroyed0() {
+        stopping = true;
+
         if (ctxInitLatch.getCount() > 0)
             // Safety.
             ctxInitLatch.countDown();
@@ -1428,20 +1758,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             GridCommunicationClient client = null;
 
             try {
-                client = reserveClient(node);
+                boolean retry;
+
+                do {
+                    client = reserveClient(node);
 
-                UUID nodeId = null;
+                    UUID nodeId = null;
 
-                if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
-                    nodeId = node.id();
+                    if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
+                        nodeId = node.id();
 
-                client.sendMessage(nodeId, msg);
+                    retry = client.sendMessage(nodeId, msg);
 
-                client.release();
+                    client.release();
 
-                client = null;
+                    client = null;
+
+                    if (!retry)
+                        sentMsgsCnt.increment();
+                    else {
+                        ClusterNode node0 = getSpiContext().node(node.id());
 
-                sentMsgsCnt.increment();
+                        if (node0 == null)
+                            throw new GridException("Failed to send message to remote node " +
+                                "(node has left the grid): " + node.id());
+                    }
+                }
+                while (retry);
             }
             catch (GridException e) {
                 throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
@@ -1469,23 +1812,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             GridCommunicationClient client = clients.get(nodeId);
 
             if (client == null) {
+                if (stopping)
+                    throw new IgniteSpiException("Grid is stopping.");
+
                 // Do not allow concurrent connects.
-                Object sync = locks.lock(nodeId);
+                GridFutureAdapterEx<GridCommunicationClient> fut = new ConnectFuture();
 
-                try {
-                    client = clients.get(nodeId);
+                GridFutureAdapterEx<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut);
+
+                if (oldFut == null) {
+                    try {
+                        GridCommunicationClient client0 = clients.get(nodeId);
+
+                        if (client0 == null) {
+                            client0 = createNioClient(node);
+
+                            if (client0 != null) {
+                                GridCommunicationClient old = clients.put(nodeId, client0);
+
+                                assert old == null;
+                            }
+                            else
+                                U.sleep(200);
+                        }
 
-                    if (client == null) {
-                        GridCommunicationClient old = clients.put(nodeId, client = createNioClient(node));
+                        fut.onDone(client0);
+                    }
+                    catch (Throwable e) {
+                        fut.onDone(e);
 
-                        assert old == null;
+                        if (e instanceof Error)
+                            throw (Error)e;
+                    }
+                    finally {
+                        clientFuts.remove(nodeId, fut);
                     }
                 }
-                finally {
-                    locks.unlock(nodeId, sync);
-                }
+                else
+                    fut = oldFut;
 
-                assert client != null;
+                client = fut.get();
+
+                if (client == null)
+                    continue;
 
                 if (getSpiContext().node(nodeId) == null) {
                     if (clients.remove(nodeId, client))
@@ -1570,7 +1939,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, node.id(), connTimeout0);
+                safeHandshake(client, null, node.id(), connTimeout0);
             }
             catch (HandshakeTimeoutException e) {
                 if (log.isDebugEnabled())
@@ -1661,59 +2030,74 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             while (!conn) { // Reconnection on handshake timeout.
                 try {
-                    if (asyncSnd) {
-                        SocketChannel ch = SocketChannel.open();
+                    SocketChannel ch = SocketChannel.open();
 
-                        ch.configureBlocking(true);
+                    ch.configureBlocking(true);
 
-                        ch.socket().setTcpNoDelay(tcpNoDelay);
-                        ch.socket().setKeepAlive(true);
+                    ch.socket().setTcpNoDelay(tcpNoDelay);
+                    ch.socket().setKeepAlive(true);
 
-                        if (sockRcvBuf > 0)
-                            ch.socket().setReceiveBufferSize(sockRcvBuf);
+                    if (sockRcvBuf > 0)
+                        ch.socket().setReceiveBufferSize(sockRcvBuf);
 
-                        if (sockSndBuf > 0)
-                            ch.socket().setSendBufferSize(sockSndBuf);
+                    if (sockSndBuf > 0)
+                        ch.socket().setSendBufferSize(sockSndBuf);
 
-                        ch.socket().connect(addr, (int)connTimeout);
+                    GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node);
 
-                        safeHandshake(ch, node.id(), connTimeout0);
+                    if (!recoveryDesc.reserve()) {
+                        U.closeQuiet(ch);
 
-                        UUID diffVerNodeId = null;
+                        return null;
+                    }
 
-                        IgniteProductVersion locVer = getSpiContext().localNode().version();
-                        IgniteProductVersion rmtVer = node.version();
+                    long rcvCnt = -1;
 
-                        if (!locVer.equals(rmtVer))
-                            diffVerNodeId = node.id();
+                    try {
+                        ch.socket().connect(addr, (int)connTimeout);
 
-                        GridNioSession ses = nioSrvr.createSession(
-                            ch,
-                            F.asMap(
-                                NODE_ID_META, node.id(),
-                                GridNioServer.DIFF_VER_NODE_ID_META_KEY, diffVerNodeId)
-                        ).get();
+                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
 
-                        client = new GridTcpNioCommunicationClient(ses);
+                        if (rcvCnt == -1)
+                            return null;
                     }
-                    else {
-                        client = new GridTcpCommunicationClient(
-                            metricsLsnr,
-                            msgWriter,
-                            addr,
-                            locHost,
-                            connTimeout,
-                            tcpNoDelay,
-                            sockRcvBuf,
-                            sockSndBuf,
-                            connBufSize,
-                            minBufferedMsgCnt,
-                            bufSizeRatio);
-
-                        safeHandshake(client, node.id(), connTimeout0);
+                    finally {
+                        if (recoveryDesc != null && rcvCnt == -1)
+                            recoveryDesc.release();
                     }
 
-                    conn = true;
+                    UUID diffVerNodeId = null;
+
+                    IgniteProductVersion locVer = getSpiContext().localNode().version();
+                    IgniteProductVersion rmtVer = node.version();
+
+                    if (!locVer.equals(rmtVer))
+                        diffVerNodeId = node.id();
+
+                    try {
+                        Map<Integer, Object> meta = new HashMap<>();
+
+                        meta.put(NODE_ID_META, node.id());
+                        meta.put(GridNioServer.DIFF_VER_NODE_ID_META_KEY, diffVerNodeId);
+
+                        if (recoveryDesc != null) {
+                            recoveryDesc.onHandshake(rcvCnt);
+
+                            meta.put(-1, recoveryDesc);
+                        }
+
+                        GridNioSession ses = nioSrvr.createSession(ch, meta).get();
+
+                        client = new GridTcpNioCommunicationClient(ses, log);
+
+                        conn = true;
+                    }
+                    finally {
+                        if (!conn) {
+                            if (recoveryDesc != null)
+                                recoveryDesc.release();
+                        }
+                    }
                 }
                 catch (HandshakeTimeoutException e) {
                     if (client != null) {
@@ -1812,16 +2196,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * Performs handshake in timeout-safe way.
      *
      * @param client Client.
+     * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
      * @param rmtNodeId Remote node.
      * @param timeout Timeout for handshake.
      * @throws GridException If handshake failed or wasn't completed withing timeout.
+     * @return Handshake response.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    private <T> void safeHandshake(T client, UUID rmtNodeId, long timeout) throws GridException {
+    private <T> long safeHandshake(
+        T client,
+        @Nullable GridNioRecoveryDescriptor recovery,
+        UUID rmtNodeId,
+        long timeout
+    ) throws GridException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
 
         sockTimeoutWorker.addTimeoutObject(obj);
 
+        long rcvCnt = 0;
+
         try {
             if (client instanceof GridCommunicationClient)
                 ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
@@ -1851,9 +2244,62 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         log.debug("Received remote node ID: " + rmtNodeId0);
 
                     ch.write(ByteBuffer.wrap(U.GG_HEADER));
-                    ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
 
-                    success = true;
+                    if (recovery != null) {
+                        HandshakeMessage msg = new HandshakeMessage(locNodeId,
+                            recovery.incrementConnectCount(),
+                            recovery.receivedCount());
+
+                        if (log.isDebugEnabled())
+                            log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+
+                        buf = ByteBuffer.allocate(33);
+
+                        buf.order(ByteOrder.nativeOrder());
+
+                        boolean written = msg.writeTo(buf);
+
+                        assert written;
+
+                        buf.flip();
+
+                        ch.write(buf);
+                    }
+                    else
+                        ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
+
+                    if (recovery != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+
+                        buf = ByteBuffer.allocate(9);
+
+                        buf.order(ByteOrder.nativeOrder());
+
+                        for (int i = 0; i < 9; ) {
+                            int read = ch.read(buf);
+
+                            if (read == -1)
+                                throw new GridException("Failed to read remote node recovery handshake " +
+                                    "(connection closed).");
+
+                            i += read;
+                        }
+
+                        rcvCnt = buf.getLong(1);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+
+                        if (rcvCnt == -1) {
+                            if (log.isDebugEnabled())
+                                log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
+                        }
+                        else
+                            success = true;
+                    }
+                    else
+                        success = true;
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())
@@ -1878,6 +2324,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 throw new HandshakeTimeoutException("Failed to perform handshake due to timeout (consider increasing " +
                     "'connectionTimeout' configuration property).");
         }
+
+        return rcvCnt;
     }
 
     /**
@@ -1896,11 +2344,83 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 "is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']');
     }
 
+    /**
+     * @param node Node.
+     * @return Recovery receive data for given node.
+     */
+    private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
+        ClientKey id = new ClientKey(node.id(), node.order());
+
+        GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
+
+        if (recovery == null) {
+            int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
+
+            int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
+
+            GridNioRecoveryDescriptor old =
+                recoveryDescs.put(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
+
+            if (old != null)
+                recovery = old;
+        }
+
+        return recovery;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpCommunicationSpi.class, this);
     }
 
+    /**
+     *
+     */
+    private static class ClientKey {
+        /** */
+        private UUID nodeId;
+
+        /** */
+        private long order;
+
+        /**
+         * @param nodeId Node ID.
+         * @param order Node order.
+         */
+        private ClientKey(UUID nodeId, long order) {
+            this.nodeId = nodeId;
+            this.order = order;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            ClientKey other = (ClientKey)obj;
+
+            return order == other.order && nodeId.equals(other.nodeId);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+
+            res = 31 * res + (int)(order ^ (order >>> 32));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ClientKey.class, this);
+        }
+    }
+
     /** Internal exception class for proper timeout handling. */
     private static class HandshakeTimeoutException extends GridException {
         /** */
@@ -2032,12 +2552,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         @SuppressWarnings({"BusyWait"})
         @Override protected void body() throws InterruptedException {
             while (!isInterrupted()) {
+                cleanupRecovery();
+
                 for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
                     UUID nodeId = e.getKey();
 
                     GridCommunicationClient client = e.getValue();
 
-                    if (getSpiContext().node(nodeId) == null) {
+                    ClusterNode node = getSpiContext().node(nodeId);
+
+                    if (node == null) {
                         if (log.isDebugEnabled())
                             log.debug("Forcing close of non-existent node connection: " + nodeId);
 
@@ -2048,9 +2572,39 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         continue;
                     }
 
+                    GridNioRecoveryDescriptor recovery = null;
+
+                    if (client instanceof GridTcpNioCommunicationClient) {
+                        recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+
+                        if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+                            RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+
+                            if (log.isDebugEnabled())
+                                log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+                                    ", rcvCnt=" + msg.received() + ']');
+
+                            nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+
+                            recovery.lastAcknowledged(msg.received());
+
+                            continue;
+                        }
+                    }
+
                     long idleTime = client.getIdleTime();
 
                     if (idleTime >= idleConnTimeout) {
+                        if (recovery != null &&
+                            recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+                            !recovery.messagesFutures().isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Node connection is idle, but there are unacknowledged messages, " +
+                                    "will wait: " + nodeId);
+
+                            continue;
+                        }
+
                         if (log.isDebugEnabled())
                             log.debug("Closing idle node connection: " + nodeId);
 
@@ -2062,6 +2616,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 Thread.sleep(idleConnTimeout);
             }
         }
+
+        /**
+         *
+         */
+        private void cleanupRecovery() {
+            Set<ClientKey> left = null;
+
+            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
+                if (left != null && left.contains(e.getKey()))
+                    continue;
+
+                GridNioRecoveryDescriptor recoverySnd = e.getValue();
+
+                if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+                    if (left == null)
+                        left = new HashSet<>();
+
+                    left.add(e.getKey());
+                }
+            }
+
+            if (left != null) {
+                assert !left.isEmpty();
+
+                for (ClientKey id : left) {
+                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
+
+                    if (recoverySnd != null)
+                        recoverySnd.onNodeLeft();
+                }
+            }
+        }
     }
 
     /**
@@ -2213,6 +2799,84 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
+    private class RecoveryWorker extends IgniteSpiThread {
+        /** */
+        private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
+
+        /**
+         *
+         */
+        private RecoveryWorker() {
+            super(gridName, "tcp-comm-recovery-worker", log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Recovery worker has been started.");
+
+            while (!isInterrupted()) {
+                GridNioRecoveryDescriptor recoveryDesc = q.take();
+
+                assert recoveryDesc != null;
+
+                ClusterNode node = recoveryDesc.node();
+
+                if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+                    continue;
+
+                try {
+                    if (log.isDebugEnabled())
+                        log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+
+                    GridCommunicationClient client = reserveClient(node);
+
+                    client.release();
+                }
+                catch (GridException e) {
+                    if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
+                        if (log.isDebugEnabled())
+                            log.debug("Recovery reconnect failed, will retry " +
+                                "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                        addReconnectRequest(recoveryDesc);
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Recovery reconnect failed, " +
+                            "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                }
+            }
+        }
+
+        /**
+         * @param recoverySnd Recovery send data.
+         */
+        void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) {
+            boolean add = q.add(recoverySnd);
+
+            assert add;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ConnectFuture extends GridFutureAdapterEx<GridCommunicationClient> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Empty constructor required for {@link Externalizable}.
+         */
+        public ConnectFuture() {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
     private static class HandshakeTimeoutObject<T> {
         /** */
         private static final AtomicLong idGen = new AtomicLong();
@@ -2355,6 +3019,206 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * Handshake message.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    public static class HandshakeMessage extends GridTcpCommunicationMessageAdapter {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private UUID nodeId;
+
+        /** */
+        private long rcvCnt;
+
+        /** */
+        private long connectCnt;
+
+        /**
+         * Default constructor required by {@link GridTcpCommunicationMessageAdapter}.
+         */
+        public HandshakeMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param connectCnt Connect count.
+         * @param rcvCnt Number of received messages.
+         */
+        public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
+            assert nodeId != null;
+            assert rcvCnt >= 0 : rcvCnt;
+
+            this.nodeId = nodeId;
+            this.connectCnt = connectCnt;
+            this.rcvCnt = rcvCnt;
+        }
+
+        /**
+         * @return Connect count.
+         */
+        public long connectCount() {
+            return connectCnt;
+        }
+
+        /**
+         * @return Number of received messages.
+         */
+        public long received() {
+            return rcvCnt;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public UUID nodeId() {
+            return nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf) {
+            if (buf.remaining() < 33)
+                return false;
+
+            buf.put(HANDSHAKE_MSG_TYPE);
+
+            byte[] bytes = U.uuidToBytes(nodeId);
+
+            assert bytes.length == 16 : bytes.length;
+
+            buf.put(bytes);
+
+            buf.putLong(rcvCnt);
+
+            buf.putLong(connectCnt);
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf) {
+            if (buf.remaining() < 32)
+                return false;
+
+            byte[] nodeIdBytes = new byte[16];
+
+            buf.get(nodeIdBytes);
+
+            nodeId = U.bytesToUuid(nodeIdBytes, 0);
+
+            rcvCnt = buf.getLong();
+
+            connectCnt = buf.getLong();
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte directType() {
+            return HANDSHAKE_MSG_TYPE;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("CloneDoesntCallSuperClone")
+        @Override public GridTcpCommunicationMessageAdapter clone() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(HandshakeMessage.class, this);
+        }
+    }
+
+    /**
+     * Recovery acknowledgment message.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    public static class RecoveryLastReceivedMessage extends GridTcpCommunicationMessageAdapter {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private long rcvCnt;
+
+        /**
+         * Default constructor required by {@link GridTcpCommunicationMessageAdapter}.
+         */
+        public RecoveryLastReceivedMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param rcvCnt Number of received messages.
+         */
+        public RecoveryLastReceivedMessage(long rcvCnt) {
+            this.rcvCnt = rcvCnt;
+        }
+
+        /**
+         * @return Number of received messages.
+         */
+        public long received() {
+            return rcvCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf) {
+            if (buf.remaining() < 9)
+                return false;
+
+            buf.put(RECOVERY_LAST_ID_MSG_TYPE);
+
+            buf.putLong(rcvCnt);
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf) {
+            if (buf.remaining() < 8)
+                return false;
+
+            rcvCnt = buf.getLong();
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte directType() {
+            return RECOVERY_LAST_ID_MSG_TYPE;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+        @Override public GridTcpCommunicationMessageAdapter clone() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipRecovery() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RecoveryLastReceivedMessage.class, this);
+        }
+    }
+
+    /**
      * Node ID message.
      */
     @SuppressWarnings("PublicInnerClass")
@@ -2426,5 +3290,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
             // No-op.
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(NodeIdMessage.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index b35e735..1255925 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -70,15 +70,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
     public int getSelectorsCount();
 
     /**
-     * Gets flag defining whether asynchronous (NIO) or synchronous (blocking) IO
-     * should be used to send messages.
-     *
-     * @return {@code True} if asynchronous IO should be used to send messages.
-     */
-    @IgniteMBeanDescription("Asynchronous send.")
-    public boolean isAsyncSend();
-
-    /**
      * Gets sent messages count.
      *
      * @return Sent messages count.
@@ -220,23 +211,7 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
     public int getSocketSendBuffer();
 
     /**
-     * Gets flag indicating whether dual-socket connection between nodes should be enforced. If set to
-     * {@code true}, two separate connections will be established between communicating nodes: one for outgoing
-     * messages, and one for incoming. When set to {@code false}, single {@code TCP} connection will be used
-     * for both directions.
-     * <p>
-     * This flag is useful on some operating systems, when {@code TCP_NODELAY} flag is disabled and
-     * messages take too long to get delivered.
-     *
-     * @return Whether dual-socket connection should be enforced.
-     */
-    @IgniteMBeanDescription("Dual-socket connection.")
-    public boolean isDualSocketConnection();
-
-    /**
      * Gets message queue limit for incoming and outgoing messages.
-     * <p>
-     * This parameter only used when {@link #isAsyncSend()} set to {@code true}.
      *
      * @return Send queue size limit.
      */
@@ -260,4 +235,31 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
      */
     @IgniteMBeanDescription("Buffer size ratio.")
     public double getBufferSizeRatio();
+
+    /**
+     * Gets socket write timeout for TCP connections. If message can not be written to
+     * socket within this time then connection is closed and reconnect is attempted.
+     *
+     * @return Socket write timeout for TCP connections.
+     */
+    @IgniteMBeanDescription("Socket write timeout.")
+    public long getSocketWriteTimeout();
+
+    /**
+     * Gets number of received messages per connection to node after which acknowledgment message is sent.
+     *
+     * @return Number of received messages after which acknowledgment is sent.
+     */
+    @IgniteMBeanDescription("Number of received messages after which acknowledgment is sent.")
+    public int getAckSendThreshold();
+
+    /**
+     * Gets maximum number of stored unacknowledged messages per connection to node.
+     * If number of unacknowledged messages exceeds this number then connection to node is
+     * closed and reconnect is attempted.
+     *
+     * @return Maximum number of unacknowledged messages.
+     */
+    @IgniteMBeanDescription("Maximum number of unacknowledged messages.")
+    public int getUnacknowledgedMessagesBufferSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 5377e18..ffa7b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -310,7 +310,8 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
     @Override protected void onContextDestroyed0() {
         super.onContextDestroyed0();
 
-        ipFinder.onSpiContextDestroyed();
+        if (ipFinder != null)
+            ipFinder.onSpiContextDestroyed();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
index 8c479ff..af63a2f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
@@ -168,6 +168,13 @@ public abstract class GridTcpCommunicationMessageAdapter implements Serializable
     protected abstract void clone0(GridTcpCommunicationMessageAdapter _msg);
 
     /**
+     * @return {@code True} if should skip recovery for this message.
+     */
+    public boolean skipRecovery() {
+        return false;
+    }
+
+    /**
      * @param arr Array.
      * @return Array iterator.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
index 460c827..7c92065 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
@@ -282,8 +282,14 @@ public class GridTcpCommunicationMessageFactory {
      * @return New message.
      */
     public static GridTcpCommunicationMessageAdapter create(byte type) {
-        return type == TcpCommunicationSpi.NODE_ID_MSG_TYPE ? new TcpCommunicationSpi.NodeIdMessage() :
-            create0(type);
+        if (type == TcpCommunicationSpi.NODE_ID_MSG_TYPE)
+            return new TcpCommunicationSpi.NodeIdMessage();
+        else if (type == TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE)
+            return new TcpCommunicationSpi.RecoveryLastReceivedMessage();
+        else if (type == TcpCommunicationSpi.HANDSHAKE_MSG_TYPE)
+            return new TcpCommunicationSpi.HandshakeMessage();
+        else
+            return create0(type);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
index 4128475..5b0db53 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
@@ -87,8 +87,9 @@ public interface GridCommunicationClient {
      * @param nodeId Node ID (provided only if versions of local and remote nodes are different).
      * @param msg Message to send.
      * @throws GridException If failed.
+     * @return {@code True} if should try to resend message.
      */
-    void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException;
+    boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException;
 
     /**
      * @param timeout Timeout.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
index 980b548..d6fcb60 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
@@ -99,4 +99,9 @@ public class GridNioFinishedFuture<R> implements GridNioFuture<R> {
     @Override public boolean messageThread() {
         return msgThread;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipRecovery() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
index 2775c55..a3ab1ef 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
@@ -28,7 +28,7 @@ public interface GridNioFuture<R> {
      *
      * @return Operation result.
      * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
-     * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
+     * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
      * @throws GridException If operation failed.
      * @throws IOException If IOException occurred while performing operation.
      */
@@ -42,8 +42,8 @@ public interface GridNioFuture<R> {
      * @param timeout The maximum time to wait in milliseconds.
      * @return Operation result.
      * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
-     * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
-     * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
+     * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
+     * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
      * @throws GridException If operation failed.
      * @throws IOException If IOException occurred while performing operation.
      */
@@ -57,8 +57,8 @@ public interface GridNioFuture<R> {
      * @param unit The time unit of the {@code timeout} argument.
      * @return Operation result.
      * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
-     * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
-     * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
+     * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
+     * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
      * @throws GridException If operation failed.
      * @throws IOException If IOException occurred while performing operation.
      */
@@ -104,4 +104,9 @@ public interface GridNioFuture<R> {
      * @return {@code True} if future was created in thread that was processing message.
      */
     public boolean messageThread();
+
+    /**
+     * @return {@code True} if skip recovery for this operation.
+     */
+    public boolean skipRecovery();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
index 3d5c4e9..7ab2e14 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
@@ -118,7 +118,7 @@ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements
      * @param nanosTimeout Timeout (nanoseconds).
      * @return Result.
      * @throws InterruptedException If interrupted.
-     * @throws org.apache.ignite.lang.IgniteFutureTimeoutException If timeout reached before computation completed.
+     * @throws IgniteFutureTimeoutException If timeout reached before computation completed.
      * @throws GridException If error occurred.
      */
     @Nullable protected R get0(long nanosTimeout) throws InterruptedException, GridException {
@@ -307,6 +307,11 @@ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements
     }
 
     /** {@inheritDoc} */
+    @Override public boolean skipRecovery() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioFutureImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java
new file mode 100644
index 0000000..846a388
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java
@@ -0,0 +1,356 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.util.nio;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Recovery information for single node.
+ */
+public class GridNioRecoveryDescriptor {
+    /** Number of acknowledged messages. */
+    private long acked;
+
+    /** Unacknowledged message futures. */
+    private final ArrayDeque<GridNioFuture<?>> msgFuts;
+
+    /** Number of messages to resend. */
+    private int resendCnt;
+
+    /** Number of received messages. */
+    private long rcvCnt;
+
+    /** Reserved flag. */
+    private boolean reserved;
+
+    /** Last acknowledged message. */
+    private long lastAck;
+
+    /** Node left flag. */
+    private boolean nodeLeft;
+
+    /** Target node. */
+    private final ClusterNode node;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Incoming connection request from remote node. */
+    private IgniteBiTuple<Long, IgniteInClosure<Boolean>> handshakeReq;
+
+    /** Connected flag. */
+    private boolean connected;
+
+    /** Number of outgoing connect attempts. */
+    private long connectCnt;
+
+    /** Maximum size of unacknowledged messages queue. */
+    private final int queueLimit;
+
+    /**
+     * @param queueLimit Maximum size of unacknowledged messages queue.
+     * @param node Node.
+     * @param log Logger.
+     */
+    public GridNioRecoveryDescriptor(int queueLimit, ClusterNode node, IgniteLogger log) {
+        assert !node.isLocal() : node;
+        assert queueLimit > 0;
+
+        msgFuts = new ArrayDeque<>(queueLimit);
+
+        this.queueLimit = queueLimit;
+        this.node = node;
+        this.log = log;
+    }
+
+    /**
+     * @return Connect count.
+     */
+    public long incrementConnectCount() {
+        return connectCnt++;
+    }
+
+    /**
+     * @return Node.
+     */
+    public ClusterNode node() {
+        return node;
+    }
+
+    /**
+     * Increments received messages counter.
+     *
+     * @return Number of received messages.
+     */
+    public long onReceived() {
+        rcvCnt++;
+
+        return rcvCnt;
+    }
+
+    /**
+     * @return Number of received messages.
+     */
+    public long received() {
+        return rcvCnt;
+    }
+
+    /**
+     * @param lastAck Last acknowledged message.
+     */
+    public void lastAcknowledged(long lastAck) {
+        this.lastAck = lastAck;
+    }
+
+    /**
+     * @return Last acknowledged message.
+     */
+    public long lastAcknowledged() {
+        return lastAck;
+    }
+
+    /**
+     * @return Received messages count.
+     */
+    public long receivedCount() {
+        return rcvCnt;
+    }
+
+    /**
+     * @return Maximum size of unacknowledged messages queue.
+     */
+    public int queueLimit() {
+        return queueLimit;
+    }
+
+    /**
+     * @param fut NIO future.
+     * @return {@code False} if queue limit is exceeded.
+     */
+    public boolean add(GridNioFuture<?> fut) {
+        assert fut != null;
+
+        if (!fut.skipRecovery()) {
+            if (resendCnt == 0) {
+                msgFuts.addLast(fut);
+
+                return msgFuts.size() < queueLimit;
+            }
+            else
+                resendCnt--;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param rcvCnt Number of messages received by remote node.
+     */
+    public void ackReceived(long rcvCnt) {
+        if (log.isDebugEnabled())
+            log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt +
+                ", msgFuts=" + msgFuts.size() + ']');
+
+        while (acked < rcvCnt) {
+            GridNioFuture<?> fut = msgFuts.pollFirst();
+
+            assert fut != null;
+
+            ((GridNioFutureImpl)fut).onDone();
+
+            acked++;
+        }
+    }
+
+    /**
+     * Node left callback.
+     */
+    public void onNodeLeft() {
+        GridNioFuture<?>[] futs = null;
+
+        synchronized (this) {
+            nodeLeft = true;
+
+            if (!reserved && !msgFuts.isEmpty()) {
+                futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+
+                msgFuts.clear();
+            }
+        }
+
+        if (futs != null)
+            completeOnNodeLeft(futs);
+    }
+
+    /**
+     * @return Message futures for unacknowledged messages.
+     */
+    public Deque<GridNioFuture<?>> messagesFutures() {
+        return msgFuts;
+    }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if node is not null and has the same order as initial remtoe node.
+     */
+    public boolean nodeAlive(@Nullable ClusterNode node) {
+        return node != null && node.order() == this.node.order();
+    }
+
+    /**
+     * @throws InterruptedException If interrupted.
+     * @return {@code True} if reserved.
+     */
+    public boolean reserve() throws InterruptedException {
+        synchronized (this) {
+            while (!connected && reserved)
+                wait();
+
+            if (!connected)
+                reserved = true;
+
+            return !connected;
+        }
+    }
+
+    /**
+     * @param rcvCnt Number of messages received by remote node.
+     */
+    public void onHandshake(long rcvCnt) {
+        ackReceived(rcvCnt);
+
+        resendCnt = msgFuts.size();
+    }
+
+    /**
+     *
+     */
+    public void connected() {
+        synchronized (this) {
+            assert reserved;
+            assert !connected;
+
+            connected = true;
+
+            if (handshakeReq != null) {
+                IgniteInClosure<Boolean> c = handshakeReq.get2();
+
+                assert c != null;
+
+                c.apply(false);
+
+                handshakeReq = null;
+            }
+
+            notifyAll();
+        }
+    }
+
+    /**
+     *
+     */
+    public void release() {
+        GridNioFuture<?>[] futs = null;
+
+        synchronized (this) {
+            connected = false;
+
+            if (handshakeReq != null) {
+                IgniteInClosure<Boolean> c = handshakeReq.get2();
+
+                assert c != null;
+
+                handshakeReq = null;
+
+                c.apply(true);
+            }
+            else {
+                reserved = false;
+
+                notifyAll();
+            }
+
+            if (nodeLeft && !msgFuts.isEmpty()) {
+                futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+
+                msgFuts.clear();
+            }
+        }
+
+        if (futs != null)
+            completeOnNodeLeft(futs);
+    }
+
+    /**
+     * @param id Handshake ID.
+     * @param c Closure to run on reserve.
+     * @return {@code True} if reserved.
+     */
+    public boolean tryReserve(long id, IgniteInClosure<Boolean> c) {
+        synchronized (this) {
+            if (connected) {
+                c.apply(false);
+
+                return false;
+            }
+
+            if (reserved) {
+                if (handshakeReq != null) {
+                    assert handshakeReq.get1() != null;
+
+                    long id0 = handshakeReq.get1();
+
+                    assert id0 != id : id0;
+
+                    if (id > id0) {
+                        IgniteInClosure<Boolean> c0 = handshakeReq.get2();
+
+                        assert c0 != null;
+
+                        c0.apply(false);
+
+                        handshakeReq = new IgniteBiTuple<>(id, c);
+                    }
+                    else
+                        c.apply(false);
+                }
+                else
+                    handshakeReq = new IgniteBiTuple<>(id, c);
+
+                return false;
+            }
+            else {
+                reserved = true;
+
+                return true;
+            }
+        }
+    }
+
+    /**
+     * @param futs Futures to complete.
+     */
+    private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
+        for (GridNioFuture<?> msg : futs)
+            ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNioRecoveryDescriptor.class, this);
+    }
+}