You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/02/13 12:54:09 UTC

[1/3] ignite git commit: ignite-4003 Async outgoing connections for communication SPI [Forced Update!]

Repository: ignite
Updated Branches:
  refs/heads/ignite-4003 eb7de98db -> 99e24a6c3 (forced update)


http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 0dd4079..e5b78db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
         // Try provoke connection close on socket writeTimeout.
         commSpi.setSharedMemoryPort(-1);
         commSpi.setMessageQueueLimit(10);
-        commSpi.setSocketReceiveBuffer(40);
-        commSpi.setSocketSendBuffer(40);
+        commSpi.setSocketReceiveBuffer(64);
+        commSpi.setSocketSendBuffer(64);
         commSpi.setSocketWriteTimeout(100);
         commSpi.setUnacknowledgedMessagesBufferSize(1000);
         commSpi.setConnectTimeout(10_000);

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 552dd28..c84a552 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -678,7 +678,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
         try {
             SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port()));
 
-            GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null);
+            GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null, false, null);
 
             ses = fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 5ca8f26..e58dbc0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.BasicAddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteCallable;
@@ -111,7 +112,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         cfg.setConnectorConfiguration(null);
 
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
-            @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
+            @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
                 throws IgniteCheckedException {
                 Map<String, Object> attrs = new HashMap<>(node.attributes());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 93339ed..e246dd9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,6 +40,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteMock;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -70,6 +72,9 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
     private static final Object mux = new Object();
 
     /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
+    /** */
     protected boolean useSsl = false;
 
     /**
@@ -289,6 +294,12 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart();
+
         for (int i = 0; i < getSpiCount(); i++) {
             CommunicationSpi<Message> spi = getSpi(i);
 
@@ -298,18 +309,20 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
 
-            node.order(i);
-
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             info(">>> Initialized context: nodeId=" + ctx.localNode().id());
 
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
 
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
             if (useSsl) {
                 IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
 
@@ -324,6 +337,8 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
             node.setAttributes(spi.getNodeAttributes());
             node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
 
+            node.order(i + 1);
+
             nodes.add(node);
 
             spi.spiStart(getTestGridName() + (i + 1));
@@ -346,6 +361,14 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
 
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis.values()) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index a649130..948ca9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -36,7 +36,9 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -51,6 +53,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteMock;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -79,6 +82,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     protected static final List<ClusterNode> nodes = new ArrayList<>();
 
     /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
+    /** */
     private static int port = 60_000;
 
     /** Use ssl. */
@@ -407,27 +413,37 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart();
+
         for (int i = 0; i < SPI_CNT; i++) {
             CommunicationSpi<Message> spi = createSpi();
 
-            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
-
             IgniteTestResources rsrcs = new IgniteTestResources();
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
 
+            node.setAttribute(IgniteNodeAttributes.ATTR_CLIENT_MODE, false);
+
             node.order(i + 1);
 
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             info(">>> Initialized context: nodeId=" + ctx.localNode().id());
 
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
 
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
             if (useSsl) {
                 IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
 
@@ -494,6 +510,14 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
      * @throws Exception If failed.
      */
     private void stopSpis() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 12c2edb..3ef46e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -44,6 +45,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
 import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -64,11 +66,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
     protected static final List<ClusterNode> nodes = new ArrayList<>();
 
     /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
+    /** */
     private static final int SPI_CNT = 2;
 
-    /**
-     *
-     */
     static {
         GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
             @Override public Message apply() {
@@ -159,6 +161,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                     spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0));
                 }
 
+                U.sleep(500);
+
                 expMsgs += msgPerIter;
 
                 final long totAcked0 = totAcked;
@@ -166,9 +170,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                 for (TcpCommunicationSpi spi : spis) {
                     GridNioServer srv = U.field(spi, "nioSrvr");
 
-                    Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+                    final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override public boolean apply() {
+                            return !sessions.isEmpty();
+                        }
+                    }, 5_000);
 
-                    assertFalse(sessions.isEmpty());
 
                     boolean found = false;
 
@@ -268,21 +277,21 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
         ClusterNode node0 = nodes.get(0);
         ClusterNode node1 = nodes.get(1);
 
-        final GridNioServer srv1 = U.field(spi1, "nioSrvr");
-
         int msgId = 0;
 
         // Send message to establish connection.
         spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
 
+        U.sleep(1000);
+
         // Prevent node1 from send
-        GridTestUtils.setFieldValue(srv1, "skipWrite", true);
+        GridTestUtils.setFieldValue(spi1, "skipAck", true);
 
         final GridNioSession ses0 = communicationSession(spi0);
 
         int sentMsgs = 1;
 
-        for (int i = 0; i < 150; i++) {
+        for (int i = 0; i < 1280; i++) {
             try {
                 spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
 
@@ -304,7 +313,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
 
         assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
-        GridTestUtils.setFieldValue(srv1, "skipWrite", false);
+        GridTestUtils.setFieldValue(spi1, "skipAck", false);
 
         for (int i = 0; i < 100; i++)
             spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
@@ -379,11 +388,15 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart();
+
         for (int i = 0; i < SPI_CNT; i++) {
             TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
 
-            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
-
             IgniteTestResources rsrcs = new IgniteTestResources();
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -392,14 +405,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
 
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
             spi.setListener(new TestListener());
 
             node.setAttributes(spi.getNodeAttributes());
 
+            node.order(i);
+
             nodes.add(node);
 
             spi.spiStart(getTestGridName() + (i + 1));
@@ -455,6 +474,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
      * @throws Exception If failed.
      */
     private void stopSpis() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 065a3d7..421a5cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteMock;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -80,6 +82,9 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
     /** Use ssl. */
     protected boolean useSsl;
 
+    /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
     /**
      *
      */
@@ -115,7 +120,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
         /** {@inheritDoc} */
         @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
-            // info("Test listener received message: " + msg);
+            //info("Test listener received message: " + msg);
 
             assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
 
@@ -186,7 +191,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
      * @return Timeout.
      */
     protected long awaitForSocketWriteTimeout() {
-        return 8000;
+        return 20000;
     }
 
     /**
@@ -298,6 +303,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
             // Send message to establish connection.
             spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return lsnr1.rcvCnt.get() >= 1;
+                }
+            }, 1000);
+
             final AtomicInteger sentCnt = new AtomicInteger(1);
 
             int errCnt = 0;
@@ -413,6 +424,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
             // Send message to establish connection.
             spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return lsnr1.rcvCnt.get() >= 1;
+                }
+            }, 1000);
+
             expCnt1.incrementAndGet();
 
             int errCnt = 0;
@@ -451,7 +468,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         ses1.resumeReads().get();
                     }
                     catch (IgniteCheckedException ignore) {
-                        // Can fail is ses1 was closed.
+                        // Can fail if ses1 was closed.
                     }
 
                     // Wait when session is closed, then try to open new connection from node1.
@@ -534,6 +551,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
             // Send message to establish connection.
             spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return lsnr1.rcvCnt.get() >= 1;
+                }
+            }, 1000);
+
             final AtomicInteger sentCnt = new AtomicInteger(1);
 
             int errCnt = 0;
@@ -686,11 +709,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart();
+
         for (int i = 0; i < SPI_CNT; i++) {
             TcpCommunicationSpi spi = getSpi(i);
 
-            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
-
             IgniteTestResources rsrcs = new IgniteTestResources();
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -701,10 +728,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
 
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
             if (useSsl) {
                 IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
 
@@ -770,6 +801,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
      * @throws Exception If failed.
      */
     private void stopSpis() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index c4930a0..dd017fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
 import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -70,6 +72,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
     /** */
     private static final int SPI_CNT = 2;
 
+    /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
     /**
      *
      */
@@ -98,8 +103,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
         /** {@inheritDoc} */
         @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
-            info("Test listener received message: " + msg);
-
             assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
 
             GridTestMessage msg0 = (GridTestMessage)msg;
@@ -171,6 +174,17 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
                     spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
 
                     spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC);
+
+                    if (j == 0) {
+                        final TestListener lsnr0 = (TestListener)spi0.getListener();
+                        final TestListener lsnr1 = (TestListener)spi1.getListener();
+
+                        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                            @Override public boolean apply() {
+                                return lsnr0.rcvCnt.get() >= 1 && lsnr1.rcvCnt.get() >= 1;
+                            }
+                        }, 1000);
+                    }
                 }
 
                 expMsgs += msgPerIter;
@@ -415,6 +429,12 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart();
+
         for (int i = 0; i < SPI_CNT; i++) {
             TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
 
@@ -428,6 +448,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
@@ -436,6 +458,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
             node.setAttributes(spi.getNodeAttributes());
 
+            node.order(i);
+
             nodes.add(node);
 
             spi.spiStart(getTestGridName() + (i + 1));
@@ -491,6 +515,14 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
      * @throws Exception If failed.
      */
     private void stopSpis() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index b530e36..9c59cb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -188,8 +188,7 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
         final CountDownLatch latch = new CountDownLatch(1);
 
         grid(0).events().localListen(new IgnitePredicate<Event>() {
-            @Override
-            public boolean apply(Event event) {
+            @Override public boolean apply(Event evt) {
                 latch.countDown();
 
                 return true;
@@ -239,14 +238,14 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
         }, 5000);
 
         try {
-            fut1.get();
+            fut1.get(1000);
         }
         catch (IgniteCheckedException e) {
             // No-op.
         }
 
         try {
-            fut2.get();
+            fut2.get(1000);
         }
         catch (IgniteCheckedException e) {
             // No-op.
@@ -297,7 +296,9 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
      */
     private static class TestCommunicationSpi extends TcpCommunicationSpi {
         /** {@inheritDoc} */
-        @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+        @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+                throws IgniteCheckedException
+        {
             if (pred.apply(getLocalNode(), node)) {
                 Map<String, Object> attrs = new HashMap<>(node.attributes());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
index c21e6ce..baa1270 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -240,7 +240,9 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
      */
     private static class TestCommunicationSpi extends TcpCommunicationSpi {
         /** {@inheritDoc} */
-        @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+        @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+                throws IgniteCheckedException
+        {
             if (PRED.apply(node)) {
                 Map<String, Object> attrs = new HashMap<>(node.attributes());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
index ff58509..004e86b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
@@ -1002,7 +1002,10 @@ public class HadoopExternalCommunication {
 
                 HandshakeFinish fin = new HandshakeFinish();
 
-                GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get();
+                GridNioFuture<GridNioSession> sesFut =
+                    nioSrvr.createSession(ch, F.<Integer, Object>asMap(HANDSHAKE_FINISH_META, fin), false, null);
+
+                GridNioSession ses = sesFut.get();
 
                 client = new HadoopTcpNioCommunicationClient(ses);
 


[2/3] ignite git commit: ignite-4003 Async outgoing connections for communication SPI

Posted by ag...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4c89a7c..7c0a44b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -35,6 +35,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -46,8 +47,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
@@ -66,7 +65,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.ipc.IpcEndpoint;
 import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
@@ -78,6 +77,7 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
 import org.apache.ignite.internal.util.nio.GridDirectParser;
 import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
 import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
 import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
@@ -90,9 +90,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
-import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
-import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -136,7 +134,6 @@ import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
 
 /**
  * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -300,10 +297,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** Connection index meta for session. */
     private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
+    /** Recovery descriptor meta key. */
+    private static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Connection context meta key. */
+    private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
     /**
      * Default local port range (value is <tt>100</tt>).
      * See {@link #setLocalPortRange(int)} for details.
@@ -338,6 +340,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** */
     public static final byte HANDSHAKE_MSG_TYPE = -3;
 
+    /** Ignite header message. */
+    private static final Message IGNITE_HEADER_MSG = new IgniteHeaderMessage();
+
+    /** Skip ack. For test purposes only. */
+    private boolean skipAck;
+
     /** */
     private ConnectGateway connectGate;
 
@@ -402,10 +410,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                         log.debug("Session was closed but there are unacknowledged messages, " +
                                             "will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
 
-                                    DisconnectedSessionInfo disconnectData =
-                                        new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
+                                    SessionInfo sesInfo =
+                                            new SessionInfo(ses, connId.connectionIndex(), SessionState.RECONNECT);
 
-                                    commWorker.addProcessDisconnectRequest(disconnectData);
+                                    commWorker.addSessionStateChangeRequest(sesInfo);
                                 }
                             }
                             else
@@ -431,14 +439,75 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 if (msg instanceof NodeIdMessage) {
                     sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
-                    connKey = new ConnectionKey(sndId, 0, -1);
+
+                    if (ses.remoteAddress() != null) { // Not shmem.
+                        assert !ses.accepted();
+
+                        ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+                        assert ctx != null;
+                        assert ctx.expNodeId != null;
+
+                        if (sndId.equals(ctx.expNodeId)) {
+                            GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
+
+                            assert recoveryDesc != null;
+
+                            long connCnt = recoveryDesc.incrementConnectCount();
+
+                            connKey = new ConnectionKey(sndId, ctx.connIdx, connCnt);
+
+                            final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
+
+                            assert old == null;
+
+                            ses.send(IGNITE_HEADER_MSG);
+
+                            ClusterNode locNode = getLocalNode();
+
+                            if (locNode == null) {
+                                commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE,
+                                    new IgniteCheckedException("Local node has not been started or " +
+                                        "fully initialized [isStopping=" + getSpiContext().isStopping() + ']')));
+
+                                return;
+                            }
+
+                            Integer handshakeConnIdx = useMultipleConnections(getSpiContext().node(sndId))
+                                    ? connKey.connectionIndex() : null;
+
+                            HandshakeMessage handshakeMsg;
+
+                            if (handshakeConnIdx != null)
+                                handshakeMsg = new HandshakeMessage2(locNode.id(), connCnt, recoveryDesc.received(),
+                                        handshakeConnIdx);
+                            else
+                                handshakeMsg = new HandshakeMessage(locNode.id(), connCnt, recoveryDesc.received());
+
+                            if (log.isDebugEnabled())
+                                log.debug("Write handshake message [rmtNode=" + sndId +
+                                    ", msg=" + handshakeMsg + ']');
+
+                            ses.send(handshakeMsg);
+                        }
+                        else {
+                            commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE,
+                                new IgniteCheckedException("Remote node ID is not as expected [expected=" +
+                                    ctx.expNodeId + ", rcvd=" + sndId + ']')));
+                        }
+
+                        return;
+                    }
+                    else
+                        connKey = new ConnectionKey(sndId, 0, -1);
                 }
                 else {
                     assert msg instanceof HandshakeMessage : msg;
 
                     HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                    sndId = ((HandshakeMessage)msg).nodeId();
+                    sndId = msg0.nodeId();
+
                     connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
                 }
 
@@ -479,30 +548,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (reserve)
                         connectedNew(recoveryDesc, ses, true);
                     else {
-                        if (c.failed) {
-                            ses.send(new RecoveryLastReceivedMessage(-1));
-
-                            for (GridNioSession ses0 : nioSrvr.sessions()) {
-                                ConnectionKey key0 = ses0.meta(CONN_IDX_META);
-
-                                if (ses0.accepted() && key0 != null &&
-                                    key0.nodeId().equals(connKey.nodeId()) &&
-                                    key0.connectionIndex() == connKey.connectionIndex() &&
-                                    key0.connectCount() < connKey.connectCount())
-                                    ses0.close();
-                            }
+                        for (GridNioSession ses0 : nioSrvr.sessions()) {
+                            ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+                            if (ses0.accepted() && key0 != null &&
+                                key0.nodeId().equals(connKey.nodeId()) &&
+                                key0.connectionIndex() == connKey.connectionIndex() &&
+                                key0.connectCount() < connKey.connectCount())
+                                ses0.close();
                         }
                     }
                 }
                 else {
                     assert connKey.connectionIndex() >= 0 : connKey;
 
-                    GridCommunicationClient[] curClients = clients.get(sndId);
-
-                    GridCommunicationClient oldClient =
-                        curClients != null && connKey.connectionIndex() < curClients.length ?
-                            curClients[connKey.connectionIndex()] :
-                            null;
+                    GridCommunicationClient oldClient = nodeClient(sndId, connKey.connectionIndex());
 
                     boolean hasShmemClient = false;
 
@@ -531,10 +591,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
 
                     if (oldFut == null) {
-                        curClients = clients.get(sndId);
-
-                        oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
-                            curClients[connKey.connectionIndex()] : null;
+                        oldClient = nodeClient(sndId, connKey.connectionIndex());
 
                         if (oldClient != null) {
                             if (oldClient instanceof GridTcpNioCommunicationClient) {
@@ -578,7 +635,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                     }
                     else {
-                        if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+                        if (oldFut instanceof ReserveClientFuture && locNode.order() < rmtNode.order()) {
                             if (log.isDebugEnabled()) {
                                 log.debug("Received incoming connection from remote node while " +
                                     "connecting to this node, rejecting [locNode=" + locNode.id() +
@@ -604,9 +661,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
                 if (connKey == null) {
-                    assert ses.accepted() : ses;
-
-                    if (!connectGate.tryEnter()) {
+                    if (ses.accepted() && !connectGate.tryEnter()) { // Outgoing connection already entered gate.
                         if (log.isDebugEnabled())
                             log.debug("Close incoming connection, failed to enter gateway.");
 
@@ -619,7 +674,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         onFirstMessage(ses, msg);
                     }
                     finally {
-                        connectGate.leave();
+                        if (ses.accepted())
+                            connectGate.leave();
                     }
                 }
                 else {
@@ -631,13 +687,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         if (recovery != null) {
                             RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
 
+                            long rcvCnt = msg0.received();
+
                             if (log.isDebugEnabled()) {
                                 log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
                                     ", connIdx=" + connKey.connectionIndex() +
                                     ", rcvCnt=" + msg0.received() + ']');
                             }
 
-                            recovery.ackReceived(msg0.received());
+                            ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+                            if (!ses.accepted() && ctx != null && ctx.rcvCnt == Long.MIN_VALUE) {
+                                HandshakeTimeoutObject timeoutObj = ctx.handshakeTimeoutObj;
+
+                                Exception err = null;
+
+                                if (timeoutObj != null && !cancelHandshakeTimeout(timeoutObj)) {
+                                        err = new HandshakeTimeoutException("Failed to perform handshake due to timeout " +
+                                            "(consider increasing 'connectionTimeout' configuration property).");
+                                }
+
+                                if (rcvCnt == -1 || err != null) {
+                                    if (ses.remoteAddress() != null) {
+                                        SessionInfo sesInfo = new SessionInfo(ses, SessionState.CLOSE, err);
+
+                                        commWorker.addSessionStateChangeRequest(sesInfo);
+                                    }
+                                }
+                                else {
+                                    ctx.rcvCnt = rcvCnt;
+
+                                    recovery.onHandshake(rcvCnt);
+
+                                    nioSrvr.resend(ses);
+
+                                    recovery.onConnected();
+
+                                    SessionInfo sesInfo = new SessionInfo(ses, connKey.idx, SessionState.READY);
+
+                                    commWorker.addSessionStateChangeRequest(sesInfo);
+                                }
+                            }
+                            else
+                                recovery.ackReceived(rcvCnt);
 
                             return;
                         }
@@ -655,7 +747,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                         ", rcvCnt=" + rcvCnt + ']');
                                 }
 
-                                ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
+                                if (!skipAck)
+                                    ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
 
                                 recovery.lastAcknowledged(rcvCnt);
                             }
@@ -706,7 +799,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
                 assert !usePairedConnections(node);
 
-                recovery.onHandshake(rcvCnt);
+                if (ses.accepted())
+                    recovery.onHandshake(rcvCnt);
 
                 ses.inRecoveryDescriptor(recovery);
                 ses.outRecoveryDescriptor(recovery);
@@ -772,9 +866,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final ClusterNode rmtNode;
 
-                /** */
-                private boolean failed;
-
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
@@ -791,8 +882,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** {@inheritDoc} */
                 @Override public void apply(Boolean success) {
                     try {
-                        failed = !success;
-
                         if (success) {
                             IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
                                 @Override public void apply(IgniteInternalFuture<?> msgFut) {
@@ -913,10 +1002,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                     else {
                         try {
-                            fut.onDone();
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> msgFut) {
+                                    try {
+                                        msgFut.get();
+                                    } catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send recovery handshake " +
+                                                    "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+                                        recoveryDesc.release();
+                                    } finally {
+                                        fut.onDone();
+
+                                        clientFuts.remove(connKey, fut);
+
+                                        ses.close();
+                                    }
+                                }
+                            });
                         }
-                        finally {
-                            clientFuts.remove(connKey, fut);
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to send message: " + e, e);
                         }
                     }
                 }
@@ -1721,12 +1828,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             nioSrvr.dumpStats();
     }
 
-    /** */
-    private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
-
-    /** */
-    private final AtomicInteger connIdx = new AtomicInteger();
-
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
         initFailureDetectionTimeout();
@@ -1982,9 +2083,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
+                        UUID rmtNodeId = null;
+
                         ConnectionKey key = ses.meta(CONN_IDX_META);
 
-                        return key != null ? formatter.writer(key.nodeId()) : null;
+                        if (key != null)
+                            rmtNodeId = key.nodeId();
+                        else {
+                            ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+                            if (ctx != null)
+                                rmtNodeId = ctx.expNodeId;
+                        }
+
+                        return key != null ? formatter.writer(rmtNodeId) : null;
                     }
                 };
 
@@ -1994,7 +2106,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
                     @Override public boolean apply(Message msg) {
-                        return msg instanceof RecoveryLastReceivedMessage;
+                        return msg instanceof NotRecoverable;
                     }
                 };
 
@@ -2329,52 +2441,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (node.id().equals(locNode.id()))
             notifyListener(node.id(), msg, NOOP);
         else {
-            GridCommunicationClient client = null;
-
             int connIdx = useMultipleConnections(node) ? connPlc.connectionIndex() : 0;
 
+            send(node, connIdx, msg, ackC);
+        }
+    }
+
+    /**
+     * Try to send message.
+     */
+    private void send(final ClusterNode node,
+                      final int connIdx,
+                      final Message msg,
+                      final IgniteInClosure<IgniteException> ackC
+    ) {
+        final GridCommunicationClient client = nodeClient(node.id(), connIdx);
+
+        if (client != null && client.reserve()) {
             try {
-                boolean retry;
+                send0(client, node, msg, ackC);
+            }
+            catch (IgniteCheckedException e) {
+                if (removeNodeClient(node.id(), client))
+                    client.forceClose();
 
-                do {
-                    client = reserveClient(node, connIdx);
+                throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
+            }
+        }
+        else {
+            if (client != null)
+                removeNodeClient(node.id(), client);
 
-                    UUID nodeId = null;
+            IgniteInternalFuture<GridCommunicationClient> clientFut = reserveClient(node, connIdx);
 
-                    if (!client.async())
-                        nodeId = node.id();
+            clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+                @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+                    GridCommunicationClient client = null;
 
-                    retry = client.sendMessage(nodeId, msg, ackC);
+                    try {
+                        client = fut.get();
 
-                    client.release();
+                        send0(client, node, msg, ackC);
+                    }
+                    catch (IgniteCheckedException e) {
+                        LT.error(log, e, "Unexpected error occurred during sending of message to node: " + node.id());
 
-                    if (!retry)
-                        sentMsgsCnt.increment();
-                    else {
-                        removeNodeClient(node.id(), client);
+                        if (client != null && removeNodeClient(node.id(), client))
+                            client.forceClose();
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * @param client Client.
+     * @param node Node.
+     * @param msg Message.
+     * @param ackC Ack closure.
+     */
+    private void send0(
+        GridCommunicationClient client,
+        ClusterNode node,
+        Message msg,
+        IgniteInClosure<IgniteException> ackC
+    ) throws IgniteCheckedException {
+        assert client != null;
 
-                        ClusterNode node0 = getSpiContext().node(node.id());
+        UUID nodeId = null;
 
-                        if (node0 == null)
-                            throw new IgniteCheckedException("Failed to send message to remote node " +
-                                "(node has left the grid): " + node.id());
-                    }
+        if (!client.async())
+            nodeId = node.id();
 
-                    client = null;
-                }
-                while (retry);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
-            }
-            finally {
-                if (client != null && removeNodeClient(node.id(), client))
-                    client.forceClose();
+        boolean retry = client.sendMessage(nodeId, msg, ackC);
+
+        client.release();
+
+        if (!retry)
+            sentMsgsCnt.increment();
+        else {
+            removeNodeClient(node.id(), client);
+
+            ClusterNode node0 = getSpiContext().node(node.id());
+
+            if (node0 == null) {
+                U.warn(log, "Failed to send message to remote node (node has left the grid): " + node.id());
+
+                return;
             }
+
+            send(node, client.connectionIndex(), msg, ackC);
         }
     }
 
     /**
+     * @param nodeId Node id.
+     * @param connIdx Connection index.
+     */
+    private GridCommunicationClient nodeClient(UUID nodeId, int connIdx) {
+        GridCommunicationClient[] curClients = clients.get(nodeId);
+
+        return curClients != null && connIdx < curClients.length ? curClients[connIdx] : null;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param rmvClient Client to remove.
      * @return {@code True} if client was removed.
@@ -2443,95 +2613,245 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @param node Node to which client should be open.
      * @param connIdx Connection index.
      * @return The existing or just created client.
-     * @throws IgniteCheckedException Thrown if any exception occurs.
      */
-    private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
-        assert node != null;
-        assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
+    private IgniteInternalFuture<GridCommunicationClient> reserveClient(ClusterNode node, int connIdx) {
+        GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
 
-        UUID nodeId = node.id();
+        tryReserveClient(node, connIdx, fut);
 
-        while (true) {
-            GridCommunicationClient[] curClients = clients.get(nodeId);
+        return fut;
+    }
+
+    /**
+     * @param node Node.
+     * @param connIdx Connection index.
+     * @param fut Future.
+     */
+    private void tryReserveClient(
+            final ClusterNode node,
+            final int connIdx,
+            final GridFutureAdapter<GridCommunicationClient> fut)
+    {
+        final ReserveClientFuture reserveFut = new ReserveClientFuture(node, connIdx);
+
+        reserveFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+            @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) {
+                try {
+                    GridCommunicationClient client = fut0.get();
+
+                    if (client != null)
+                        fut.onDone(client);
+                    else
+                        tryReserveClient(node, connIdx, fut);
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onDone(e);
+                }
+            }
+        });
+
+        try {
+            reserveFut.reserve();
+        }
+        catch (Exception e) {
+            fut.onDone(e);
+        }
+    }
+
+    /**
+     *
+     */
+    private class ReserveClientFuture extends GridFutureAdapter<GridCommunicationClient> {
+        /** Node. */
+        private final ClusterNode node;
+
+        /** Connection index. */
+        private final int connIdx;
+
+        /**
+         * @param node Node.
+         */
+        ReserveClientFuture(ClusterNode node, int connIdx) {
+            assert node != null;
+            assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
+
+            this.node = node;
+            this.connIdx = connIdx;
+        }
+
+        /**
+         *
+         */
+        void reserve() {
+            final UUID nodeId = node.id();
 
-            GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
-                curClients[connIdx] : null;
+            final GridCommunicationClient client = nodeClient(nodeId, connIdx);
+
+            final GridFutureAdapter<GridCommunicationClient> connFut;
 
             if (client == null) {
-                if (stopping)
-                    throw new IgniteSpiException("Node is stopping.");
+                if (stopping) {
+                    onDone(new IgniteSpiException("Node is stopping."));
+
+                    return;
+                }
 
                 // Do not allow concurrent connects.
-                GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
+                connFut = this;
 
-                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+                final ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
 
-                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
+                final GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, connFut);
 
                 if (oldFut == null) {
                     try {
-                        GridCommunicationClient[] curClients0 = clients.get(nodeId);
-
-                        GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
-                            curClients0[connIdx] : null;
+                        GridCommunicationClient client0 = nodeClient(nodeId, connIdx);
 
                         if (client0 == null) {
-                            client0 = createNioClient(node, connIdx);
-
-                            if (client0 != null) {
-                                addNodeClient(node, connIdx, client0);
+                            IgniteInternalFuture<GridCommunicationClient> clientFut = createNioClient(node, connIdx);
 
-                                if (client0 instanceof GridTcpNioCommunicationClient) {
-                                    GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
-
-                                    if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Session was closed after client creation, will retry " +
-                                                "[node=" + node + ", client=" + client0 + ']');
+                            clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+                                @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+                                    try {
+                                        GridCommunicationClient client0 = fut.get();
+
+                                        if (client0 != null) {
+                                            addNodeClient(node, connIdx, client0);
+
+                                            if (client0 instanceof GridTcpNioCommunicationClient) {
+                                                GridTcpNioCommunicationClient tcpClient =
+                                                    ((GridTcpNioCommunicationClient)client0);
+
+                                                if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Session was closed after client creation, " +
+                                                                "will retry [node=" + node + ", client=" + client0 + ']');
+
+                                                    client0 = null;
+                                                }
+                                            }
+
+                                            if (client0 == null) {
+                                                clientFuts.remove(connKey, connFut);
+
+                                                onDone();
+                                            }
+                                            else if (client0.reserve()) {
+                                                clientFuts.remove(connKey, connFut);
+
+                                                onDone(client0);
+                                            }
+                                            else {
+                                                clientFuts.remove(connKey, connFut);
+
+                                                removeNodeClient(nodeId, client0);
+
+                                                onDone();
+                                            }
+                                        }
+                                        else {
+                                            final long currTime = U.currentTimeMillis();
+
+                                            addTimeoutObject(new IgniteSpiTimeoutObject() {
+                                                private final IgniteUuid id = IgniteUuid.randomUuid();
+
+                                                @Override public IgniteUuid id() {
+                                                    return id;
+                                                }
+
+                                                @Override public long endTime() {
+                                                    return currTime + 200;
+                                                }
+
+                                                @Override public void onTimeout() {
+                                                    SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY,
+                                                        new Runnable() {
+                                                            @Override public void run() {
+                                                                clientFuts.remove(connKey, connFut);
+
+                                                                onDone();
+                                                            }
+                                                        });
+
+                                                    commWorker.addSessionStateChangeRequest(sesInfo);
+                                                }
+                                            });
+                                        }
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        clientFuts.remove(connKey, connFut);
 
-                                        client0 = null;
+                                        onDone(e);
                                     }
                                 }
-                            }
-                            else
-                                U.sleep(200);
+                            });
                         }
+                        else {
+                            assert connIdx == client0.connectionIndex() : client0;
 
-                        fut.onDone(client0);
+                            if (client0.reserve())
+                                onDone(client0);
+                            else {
+                                removeNodeClient(nodeId, client0);
+
+                                onDone();
+                            }
+                        }
                     }
                     catch (Throwable e) {
-                        fut.onDone(e);
+                        connFut.onDone(e);
 
                         if (e instanceof Error)
                             throw (Error)e;
                     }
-                    finally {
-                        clientFuts.remove(connKey, fut);
-                    }
                 }
-                else
-                    fut = oldFut;
+                else {
+                    oldFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+                        @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+                            try {
+                                GridCommunicationClient client0 = fut.get();
 
-                client = fut.get();
+                                if (client0 == null) {
+                                    clientFuts.remove(connKey, oldFut);
 
-                if (client == null)
-                    continue;
+                                    onDone();
+                                }
+                                else if (client0.reserve()) {
+                                    clientFuts.remove(connKey, oldFut);
 
-                if (getSpiContext().node(nodeId) == null) {
-                    if (removeNodeClient(nodeId, client))
-                        client.forceClose();
+                                    onDone(client0);
+                                }
+                                else {
+                                    clientFuts.remove(connKey, oldFut);
+
+                                    removeNodeClient(nodeId, client0);
 
-                    throw new IgniteSpiException("Destination node is not in topology: " + node.id());
+                                    onDone();
+                                }
+                            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                        }
+                    });
                 }
             }
+            else {
+                assert connIdx == client.connectionIndex() : client;
+
+                if (client.reserve())
+                    onDone(client);
+                else {
+                    removeNodeClient(nodeId, client);
 
-            assert connIdx == client.connectionIndex() : client;
+                    onDone();
+                }
+            }
+        }
 
-            if (client.reserve())
-                return client;
-            else
-                // Client has just been closed by idle worker. Help it and try again.
-                removeNodeClient(nodeId, client);
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReserveClientFuture.class, this);
         }
     }
 
@@ -2539,10 +2859,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @param node Node to create client for.
      * @param connIdx Connection index.
      * @return Client.
-     * @throws IgniteCheckedException If failed.
      */
-    @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
-        throws IgniteCheckedException {
+    protected IgniteInternalFuture<GridCommunicationClient> createNioClient(ClusterNode node, int connIdx) {
         assert node != null;
 
         Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2550,7 +2868,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         ClusterNode locNode = getSpiContext().localNode();
 
         if (locNode == null)
-            throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
+            return new GridFinishedFuture<>(
+                new IgniteCheckedException("Failed to create NIO client (local node is stopping)")
+            );
 
         if (log.isDebugEnabled())
             log.debug("Creating NIO client to node: " + node);
@@ -2567,7 +2887,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (log.isDebugEnabled())
                     log.debug("Shmem client created: " + client);
 
-                return client;
+                return new GridFinishedFuture<>(client);
             }
             catch (IgniteCheckedException e) {
                 if (e.hasCause(IpcOutOfSystemResourcesException.class))
@@ -2578,21 +2898,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 else if (log.isDebugEnabled())
                     log.debug("Failed to establish shared memory connection with local node (node has left): " +
                         node.id());
+
+                return new GridFinishedFuture<>(e);
             }
         }
 
-        connectGate.enter();
 
         try {
-            GridCommunicationClient client = createTcpClient(node, connIdx);
-
-            if (log.isDebugEnabled())
-                log.debug("TCP client created: " + client);
-
-            return client;
+            return createTcpClient(node, connIdx);
         }
-        finally {
-            connectGate.leave();
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
         }
     }
 
@@ -2641,12 +2957,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client,
-                    null,
-                    node.id(),
-                    timeoutHelper.nextTimeoutChunk(connTimeout0),
-                    null,
-                    null);
+                safeHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
             }
             catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -2708,7 +3019,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             ConnectionKey id = ses.meta(CONN_IDX_META);
 
             if (id != null) {
-                ClusterNode node = getSpiContext().node(id.nodeId);
+                ClusterNode node = getSpiContext().node(id.nodeId());
 
                 if (node != null && node.isClient()) {
                     String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -2731,507 +3042,502 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      *
      * @param node Remote node.
      * @param connIdx Connection index.
-     * @return Client.
+     * @return Client future.
      * @throws IgniteCheckedException If failed.
      */
-    protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
-        Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
-        Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
-        Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
-        Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
-
-        boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
-        boolean isExtAddrsExist = !F.isEmpty(extAddrs);
+    protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+            throws IgniteCheckedException
+    {
+        TcpClientFuture fut = new TcpClientFuture(node, connIdx);
 
-        if (!isRmtAddrsExist && !isExtAddrsExist)
-            throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
-                "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
-                "that you use the same communication SPI on all nodes. Remote node id: " + node.id());
+        connectGate.enter();
 
-        LinkedHashSet<InetSocketAddress> addrs;
+        fut.connect();
 
-        // Try to connect first on bound addresses.
-        if (isRmtAddrsExist) {
-            List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
+        fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+            @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) {
+                connectGate.leave();
+            }
+        });
 
-            boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
+        return fut;
+    }
 
-            Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
+    /**
+     * @param timeoutObj Timeout object.
+     */
+    private boolean cancelHandshakeTimeout(HandshakeTimeoutObject timeoutObj) {
+        boolean cancelled = timeoutObj.cancel();
 
-            addrs = new LinkedHashSet<>(addrs0);
-        }
-        else
-            addrs = new LinkedHashSet<>();
+        if (cancelled)
+            removeTimeoutObject(timeoutObj);
 
-        // Then on mapped external addresses.
-        if (isExtAddrsExist)
-            addrs.addAll(extAddrs);
+        return cancelled;
+    }
 
-        boolean conn = false;
-        GridCommunicationClient client = null;
-        IgniteCheckedException errs = null;
+    /**
+     *
+     */
+    private class TcpClientFuture extends GridFutureAdapter<GridCommunicationClient> {
+        /** Node. */
+        private final ClusterNode node;
 
-        int connectAttempts = 1;
+        /** Timeout helper. */
+        private final IgniteSpiOperationTimeoutHelper timeoutHelper =
+            new IgniteSpiOperationTimeoutHelper(TcpCommunicationSpi.this);
 
-        for (InetSocketAddress addr : addrs) {
-            long connTimeout0 = connTimeout;
+        /** Addresses. */
+        private Collection<InetSocketAddress> addrs;
 
-            int attempt = 1;
+        /** Addresses it. */
+        private Iterator<InetSocketAddress> addrsIt;
 
-            IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+        /** Current addresses. */
+        private volatile InetSocketAddress currAddr;
 
-            while (!conn) { // Reconnection on handshake timeout.
-                try {
-                    SocketChannel ch = SocketChannel.open();
+        /** Err. */
+        private volatile IgniteCheckedException err;
 
-                    ch.configureBlocking(true);
+        /** Connect attempts. */
+        private volatile int connectAttempts;
 
-                    ch.socket().setTcpNoDelay(tcpNoDelay);
-                    ch.socket().setKeepAlive(true);
+        /** Attempts. */
+        private volatile int attempt;
 
-                    if (sockRcvBuf > 0)
-                        ch.socket().setReceiveBufferSize(sockRcvBuf);
+        /** Connection index. */
+        private volatile int connIdx;
 
-                    if (sockSndBuf > 0)
-                        ch.socket().setSendBufferSize(sockSndBuf);
+        /**
+         * @param node Node.
+         */
+        TcpClientFuture(ClusterNode node, int connIdx) {
+            this.node = node;
+            this.connIdx = connIdx;
+        }
 
-                    if (getSpiContext().node(node.id()) == null) {
-                        U.closeQuiet(ch);
+        /**
+         * Connects to remote node.
+         */
+        void connect() {
+            try {
+                addrs = addrs();
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
 
-                        throw new ClusterTopologyCheckedException("Failed to send message " +
-                            "(node left topology): " + node);
-                    }
+                return;
+            }
 
-                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
+            addrsIt = addrs.iterator();
 
-                    GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
+            tryConnect(true);
+        }
 
-                    if (!recoveryDesc.reserve()) {
-                        U.closeQuiet(ch);
+        /**
+         *
+         */
+        private void tryConnect(boolean next) {
+            if (next && !addrsIt.hasNext()) {
+                IgniteCheckedException err0 = err;
 
-                        return null;
-                    }
+                assert err0 != null;
 
-                    long rcvCnt = -1;
+                UUID nodeId = node.id();
 
-                    Map<Integer, Object> meta = new HashMap<>();
+                if (getSpiContext().node(nodeId) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
+                        X.hasCause(err, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
+                                IgniteSpiOperationTimeoutException.class))
+                {
+                    LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be " +
+                        "dropped from cluster [" + "rmtNode=" + node + ", err=" + err +
+                        ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']');
 
-                    GridSslMeta sslMeta = null;
+                    getSpiContext().failNode(nodeId, "TcpCommunicationSpi failed to establish connection to node " +
+                        "[rmtNode=" + node + ", errs=" + err + ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']');
+                }
 
-                    try {
-                        ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
+                onDone(err0);
 
-                        if (isSslEnabled()) {
-                            meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta());
+                return;
+            }
 
-                            SSLEngine sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();
+            if (next) {
+                attempt = 0;
 
-                            sslEngine.setUseClientMode(true);
+                connectAttempts = 0;
 
-                            sslMeta.sslEngine(sslEngine);
-                        }
+                currAddr = addrsIt.next();
+            }
 
-                        Integer handshakeConnIdx = useMultipleConnections(node) ? connIdx : null;
+            InetSocketAddress addr = currAddr;
 
-                        rcvCnt = safeHandshake(ch,
-                            recoveryDesc,
-                            node.id(),
-                            timeoutHelper.nextTimeoutChunk(connTimeout0),
-                            sslMeta,
-                            handshakeConnIdx);
+            try {
+                final SocketChannel ch = SocketChannel.open();
 
-                        if (rcvCnt == -1)
-                            return null;
-                    }
-                    finally {
-                        if (recoveryDesc != null && rcvCnt == -1)
-                            recoveryDesc.release();
-                    }
+                ch.configureBlocking(false);
 
-                    try {
-                        meta.put(CONN_IDX_META, connKey);
+                ch.socket().setTcpNoDelay(tcpNoDelay);
+                ch.socket().setKeepAlive(true);
 
-                        if (recoveryDesc != null) {
-                            recoveryDesc.onHandshake(rcvCnt);
+                if (sockRcvBuf > 0)
+                    ch.socket().setReceiveBufferSize(sockRcvBuf);
 
-                            meta.put(-1, recoveryDesc);
-                        }
+                if (sockSndBuf > 0)
+                    ch.socket().setSendBufferSize(sockSndBuf);
 
-                        GridNioSession ses = nioSrvr.createSession(ch, meta).get();
+                if (getSpiContext().node(node.id()) == null) {
+                    U.closeQuiet(ch);
 
-                        client = new GridTcpNioCommunicationClient(connIdx, ses, log);
+                    onError(new ClusterTopologyCheckedException("Failed to send message " +
+                        "(node left topology): " + node));
 
-                        conn = true;
-                    }
-                    finally {
-                        if (!conn) {
-                            if (recoveryDesc != null)
-                                recoveryDesc.release();
-                        }
-                    }
+                    return;
                 }
-                catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
-                    if (client != null) {
-                        client.forceClose();
 
-                        client = null;
-                    }
+                final ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
 
-                    if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
-                        timeoutHelper.checkFailureTimeoutReached(e))) {
+                final GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
 
-                        String msg = "Handshake timed out (failure detection timeout is reached) " +
-                            "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']';
+                if (!recoveryDesc.reserve()) {
+                    U.closeQuiet(ch);
 
-                        onException(msg, e);
+                    onDone();
 
-                        if (log.isDebugEnabled())
-                            log.debug(msg);
+                    return;
+                }
 
-                        if (errs == null)
-                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
-                                "Make sure that each ComputeTask and cache Transaction has a timeout set " +
-                                "in order to prevent parties from waiting forever in case of network issues " +
-                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+                final Map<Integer, Object> meta = new HashMap<>();
 
-                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+                final ConnectContext ctx = new ConnectContext();
 
-                        break;
-                    }
+                ctx.expNodeId = node.id();
 
-                    assert !failureDetectionTimeoutEnabled();
+                ctx.tcpClientFut = this;
 
-                    onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
-                        ", addr=" + addr + ']', e);
+                ctx.connIdx = connIdx;
 
-                    if (log.isDebugEnabled())
-                        log.debug(
-                            "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
-                                ", addr=" + addr + ", err=" + e + ']');
+                meta.put(CONN_CTX_META_KEY, ctx);
 
-                    if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
-                        if (log.isDebugEnabled())
-                            log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
-                                "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
-                                ", attempt=" + attempt + ", reconCnt=" + reconCnt +
-                                ", err=" + e.getMessage() + ", addr=" + addr + ']');
+                meta.put(RECOVERY_DESC_META_KEY, recoveryDesc);
 
-                        if (errs == null)
-                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
-                                "Make sure that each ComputeTask and cache Transaction has a timeout set " +
-                                "in order to prevent parties from waiting forever in case of network issues " +
-                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+                final int timeoutChunk = (int)timeoutHelper.nextTimeoutChunk(connTimeout);
 
-                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+                final int attempt0 = attempt;
 
-                        break;
-                    }
-                    else {
-                        attempt++;
+                final ConnectionTimeoutObject connTimeoutObj = new ConnectionTimeoutObject(ch, meta,
+                        U.currentTimeMillis() + timeoutChunk * (attempt0 > 0 ? attempt0 * 2 : 1));
 
-                        connTimeout0 *= 2;
+                addTimeoutObject(connTimeoutObj);
 
-                        // Continue loop.
-                    }
-                }
-                catch (Exception e) {
-                    if (client != null) {
-                        client.forceClose();
+                boolean connect = ch.connect(addr);
 
-                        client = null;
-                    }
+                IgniteInClosure<IgniteInternalFuture<GridNioSession>> lsnr0 = new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+                    @Override public void apply(final IgniteInternalFuture<GridNioSession> fut) {
+                        GridNioSession ses = null;
 
-                    onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
+                        try {
+                            ses = fut.get();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
+                            boolean canceled = connTimeoutObj.cancel();
 
-                    boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
+                            if (canceled)
+                                removeTimeoutObject(connTimeoutObj);
+                            else {
+                                final GridNioSession ses0 = ses;
 
-                    if (failureDetThrReached)
-                        LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
-                            "configuration property) [addr=" + addr + ", failureDetectionTimeout=" +
-                            failureDetectionTimeout() + ']');
-                    else if (X.hasCause(e, SocketTimeoutException.class))
-                        LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
-                            "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
+                                Runnable clo = new Runnable() {
+                                    @Override public void run() {
+                                        GridNioFuture<Boolean> fut = nioSrvr.close(ses0);
 
-                    if (errs == null)
-                        errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
-                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
-                            "in order to prevent parties from waiting forever in case of network issues " +
-                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+                                        final SocketTimeoutException e = new SocketTimeoutException("Connect timed " +
+                                            "(consider increasing 'connTimeout' configuration property) [addr=" +
+                                            currAddr + ", connTimeout=" + connTimeout + ']');
 
-                    errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+                                        fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                                            @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+                                                Runnable clo = new Runnable() {
+                                                    @Override public void run() {
+                                                        onError(e);
+                                                    }
+                                                };
 
-                    // Reconnect for the second time, if connection is not established.
-                    if (!failureDetThrReached && connectAttempts < 2 &&
-                        (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
-                        connectAttempts++;
+                                                SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo);
 
-                        continue;
-                    }
+                                                commWorker.addSessionStateChangeRequest(sesInfo);
+                                            }
+                                        });
+                                    }
+                                };
 
-                    break;
-                }
-            }
+                                commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, clo));
 
-            if (conn)
-                break;
-        }
+                                return;
+                            }
 
-        if (client == null) {
-            assert errs != null;
+                            int timeoutChunk1 = (int) timeoutHelper.nextTimeoutChunk(connTimeout);
 
-            if (X.hasCause(errs, ConnectException.class))
-                LT.warn(log, "Failed to connect to a remote node " +
-                    "(make sure that destination node is alive and " +
-                    "operating system firewall is disabled on local and remote hosts) " +
-                    "[addrs=" + addrs + ']');
+                            long time = U.currentTimeMillis() + timeoutChunk1 * (attempt0 > 0 ? attempt0 * 2 : 1);
 
-            if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
-                X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
-                    IgniteSpiOperationTimeoutException.class)) {
-                LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
-                    "cluster [" +
-                    "rmtNode=" + node +
-                    ", err=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+                            HandshakeTimeoutObject<SocketChannel> handshakeTimeoutObj =
+                                    new HandshakeTimeoutObject<>(ch, TcpClientFuture.this, time);
 
-                getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
-                    "rmtNode=" + node +
-                    ", errs=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
-            }
+                            ctx.handshakeTimeoutObj = handshakeTimeoutObj;
 
-            throw errs;
-        }
+                            addTimeoutObject(handshakeTimeoutObj);
+                        }
+                        catch (final IgniteSpiOperationTimeoutException e) {
+                            assert ses != null;
 
-        return client;
-    }
+                            final GridNioSession ses0 = ses;
 
-    /**
-     * Performs handshake in timeout-safe way.
-     *
-     * @param client Client.
-     * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
-     * @param rmtNodeId Remote node.
-     * @param timeout Timeout for handshake.
-     * @param sslMeta Session meta.
-     * @param handshakeConnIdx Non null connection index if need send it in handshake.
-     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
-     * @return Handshake response.
-     */
-    @SuppressWarnings("ThrowFromFinallyBlock")
-    private <T> long safeHandshake(
-        T client,
-        @Nullable GridNioRecoveryDescriptor recovery,
-        UUID rmtNodeId,
-        long timeout,
-        GridSslMeta sslMeta,
-        @Nullable Integer handshakeConnIdx
-    ) throws IgniteCheckedException {
-        HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+                            commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, new Runnable() {
+                                @Override public void run() {
+                                    GridNioFuture<Boolean> closeFut = nioSrvr.close(ses0);
 
-        addTimeoutObject(obj);
+                                    closeFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                                        @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+                                            Runnable clo = new Runnable() {
+                                                @Override public void run() {
+                                                    onError(e);
+                                                }
+                                            };
 
-        long rcvCnt = 0;
+                                            SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo);
 
-        try {
-            if (client instanceof GridCommunicationClient)
-                ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
-            else {
-                SocketChannel ch = (SocketChannel)client;
+                                            commWorker.addSessionStateChangeRequest(sesInfo);
+                                        }
+                                    });
+                                }
+                            }));
+                        }
+                        catch (IgniteCheckedException e) {
+                            connTimeoutObj.cancel();
 
-                boolean success = false;
+                            removeTimeoutObject(connTimeoutObj);
 
-                try {
-                    BlockingSslHandler sslHnd = null;
+                            recoveryDesc.release();
 
-                    ByteBuffer buf;
+                            onError(e);
+                        }
+                    }
+                };
 
-                    if (isSslEnabled()) {
-                        assert sslMeta != null;
+                nioSrvr.createSession(ch, meta, !connect, lsnr0);
+            }
+            catch (Exception e) {
+                onDone(e);
+            }
+        }
 
-                        sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+        /**
+         * @param e Exception.
+         */
+        void onError(Exception e) {
+            if (e instanceof HandshakeTimeoutException || e instanceof IgniteSpiOperationTimeoutException) {
+                if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+                    timeoutHelper.checkFailureTimeoutReached(e))) {
 
-                        if (!sslHnd.handshake())
-                            throw new IgniteCheckedException("SSL handshake is not completed.");
+                    String msg = "Handshake timed out (failure detection timeout is reached) " +
+                        "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + currAddr + ']';
 
-                        ByteBuffer handBuff = sslHnd.applicationBuffer();
+                    onException(msg, e);
 
-                        if (handBuff.remaining() < 17) {
-                            buf = ByteBuffer.allocate(1000);
+                    if (log.isDebugEnabled())
+                        log.debug(msg);
 
-                            int read = ch.read(buf);
+                    if (err == null)
+                        err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+                            "in order to prevent parties from waiting forever in case of network issues " +
+                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
 
-                            if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+                    err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
 
-                            buf.flip();
+                    tryConnect(true);
 
-                            buf = sslHnd.decode(buf);
-                        }
-                        else
-                            buf = handBuff;
-                    }
-                    else {
-                        buf = ByteBuffer.allocate(17);
+                    return;
+                }
 
-                        for (int i = 0; i < 17; ) {
-                            int read = ch.read(buf);
+                assert !failureDetectionTimeoutEnabled();
 
-                            if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+                long connTimeout0 = connTimeout * attempt;
 
-                            i += read;
-                        }
-                    }
+                onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout +
+                    ", addr=" + currAddr + ']', e);
 
-                    UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1);
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout +
+                            ", addr=" + currAddr + ", err=" + e + ']');
 
-                    if (!rmtNodeId.equals(rmtNodeId0))
-                        throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
-                            ", rcvd=" + rmtNodeId0 + ']');
-                    else if (log.isDebugEnabled())
-                        log.debug("Received remote node ID: " + rmtNodeId0);
+                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
+                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
+                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+                            ", err=" + e.getMessage() + ", addr=" + currAddr + ']');
 
-                    if (isSslEnabled()) {
-                        assert sslHnd != null;
+                    if (err == null)
+                        err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+                            "in order to prevent parties from waiting forever in case of network issues " +
+                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
 
-                        ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
-                    }
-                    else
-                        ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+                    err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
 
-                    ClusterNode locNode = getLocalNode();
+                    tryConnect(true);
+                }
+                else {
+                    attempt++;
 
-                    if (locNode == null)
-                        throw new IgniteCheckedException("Local node has not been started or " +
-                            "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+                    tryConnect(false); // Reconnection on handshake timeout.
+                }
+            }
+            else {
+                onException("Client creation failed [addr=" + currAddr + ", err=" + e + ']', e);
 
-                    if (recovery != null) {
-                        HandshakeMessage msg;
+                if (log.isDebugEnabled())
+                    log.debug("Client creation failed [addr=" + currAddr + ", err=" + e + ']');
 
-                        int msgSize = 33;
+                boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
 
-                        if (handshakeConnIdx != null) {
-                            msg = new HandshakeMessage2(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received(),
-                                handshakeConnIdx);
+                if (failureDetThrReached)
+                    LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
+                        "configuration property) [addr=" + currAddr + ", failureDetectionTimeout=" +
+                        failureDetectionTimeout() + ']');
+                else if (X.hasCause(e, SocketTimeoutException.class))
+                    LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
+                        "configuration property) [addr=" + currAddr + ", connTimeout=" + connTimeout + ']');
 
-                            msgSize += 4;
-                        }
-                        else {
-                            msg = new HandshakeMessage(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received());
-                        }
+                if (err == null)
+                    err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                        "Make sure that each ComputeTask and GridCacheTransaction has a timeout set " +
+                        "in order to prevent parties from waiting forever in case of network issues " +
+                        "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
 
-                        if (log.isDebugEnabled())
-                            log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+                err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
 
-                        buf = ByteBuffer.allocate(msgSize);
+                // Reconnect for the second time, if connection is not established.
+                int connectAttempts0;
 
-                        buf.order(ByteOrder.nativeOrder());
+                if (!failureDetThrReached && (connectAttempts0 = connectAttempts) <= 2 &&
+                    (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
+                    connectAttempts = connectAttempts0 + 1;
 
-                        boolean written = msg.writeTo(buf, null);
+                    tryConnect(false);
 
-                        assert written;
+                    return;
+                }
 
-                        buf.flip();
+                tryConnect(true);
+            }
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+            onDone(e);
+        }
 
-                            ch.write(sslHnd.encrypt(buf));
-                        }
-                        else
-                            ch.write(buf);
-                    }
-                    else {
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+        /**
+         *
+         */
+        private Collection<InetSocketAddress> addrs() throws IgniteCheckedException {
+            Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
+            Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
+            Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
+            Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
 
-                            ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
-                        }
-                        else
-                            ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
-                    }
+            boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
+            boolean isExtAddrsExist = !F.isEmpty(extAddrs);
 
-                    if (recovery != null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+            if (!isRmtAddrsExist && !isExtAddrsExist)
+                throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
+                    "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
+                    "that you use the same communication SPI on all nodes. Remote node id: " + node.id());
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+            LinkedHashSet<InetSocketAddress> addrs;
 
-                            buf = ByteBuffer.allocate(1000);
+            // Try to connect first on bound addresses.
+            if (isRmtAddrsExist) {
+                List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
 
-                            ByteBuffer decode = null;
+                boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
 
-                            buf.order(ByteOrder.nativeOrder());
+                Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
 
-                            for (int i = 0; i < 9; ) {
-                                int read = ch.read(buf);
+                addrs = new LinkedHashSet<>(addrs0);
+            }
+            else
+                addrs = new LinkedHashSet<>();
+
+            // Then on mapped external addresses.
+            if (isExtAddrsExist)
+                addrs.addAll(extAddrs);
+
+            return addrs;
+        }
 
-                                if (read == -1)
-                                    throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TcpClientFuture.class, this);
+        }
+    }
+
+    /**
+     * Performs handshake in timeout-safe way.
+     *
+     * @param client Client.
+     * @param rmtNodeId Remote node.
+     * @param timeout Timeout for handshake.
+     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+     * @return Handshake response.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    private <T> long safeHandshake(T client, UUID rmtNodeId, long timeout) throws IgniteCheckedException {
+        HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, null, U.currentTimeMillis() + timeout);
 
-                                buf.flip();
+        addTimeoutObject(obj);
 
-                                decode = sslHnd.decode(buf);
+        long rcvCnt = 0;
 
-                                i += decode.remaining();
+        try {
+            if (client instanceof GridCommunicationClient)
+                ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
+            else {
+                SocketChannel ch = (SocketChannel)client;
 
-                                buf.clear();
-                            }
+                boolean success = false;
 
-                            rcvCnt = decode.getLong(1);
+                try {
+                    ByteBuffer buf;
 
-                            if (decode.limit() > 9) {
-                                decode.position(9);
+                    buf = ByteBuffer.allocate(17);
 
-                                sslMeta.decodedBuffer(decode);
-                            }
+                    for (int i = 0; i < 17; ) {
+                        int read = ch.read(buf);
 
-                            ByteBuffer inBuf = sslHnd.inputBuffer();
+                        if (read == -1)
+                            throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
 
-                            if (inBuf.position() > 0)
-                                sslMeta.encodedBuffer(inBuf);
-                        }
-                        else {
-                            buf = ByteBuffer.allocate(9);
+                        i += read;
+                    }
 
-                            buf.order(ByteOrder.nativeOrder());
+                    UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1);
 
-                            for (int i = 0; i < 9; ) {
-                                int read = ch.read(buf);
+                    if (!rmtNodeId.equals(rmtNodeId0))
+                        throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
+                            ", rcvd=" + rmtNodeId0 + ']');
+                    else if (log.isDebugEnabled())
+                        log.debug("Received remote node ID: " + rmtNodeId0);
 
-                                if (read == -1)
-                                    throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+                    ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
-                                i += read;
-                            }
+                    ClusterNode locNode = getLocalNode();
 
-                            rcvCnt = buf.getLong(1);
-                        }
+                    if (locNode == null)
+                        throw new IgniteCheckedException("Local node has not been started or " +
+                            "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                    ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
 
-                        if (rcvCnt == -1) {
-                            if (log.isDebugEnabled())
-                                log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
-                        }
-                        else
-                            success = true;
-                    }
-                    else
-                        success = true;
+                    success = true;
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())
@@ -3573,7 +3879,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private class CommunicationWorker extends IgniteSpiThread {
         /** */
-        private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>();
+        private final BlockingQueue<SessionInfo> q = new LinkedBlockingQueue<>();
 
         /**
          * @param gridName Grid name.
@@ -3588,10 +3894,56 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 log.debug("Tcp communication worker has been started.");
 
             while (!isInterrupted()) {
-                DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+                SessionInfo sesInfo = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+
+                if (sesInfo != null) {
+                    ConnectContext ctx;
+
+                    TcpClientFuture clientFut;
+
+                    switch (sesInfo.state) {
+                        case RECONNECT:
+                            processDisconnect(sesInfo);
+
+                            break;
+
+                        case RETRY:
+                            Runnable clo = sesInfo.clo;
+
+                            assert clo != null;
+
+                            clo.run();
+
+                            break;
+
+                        case READY:
+                            ctx = sesInfo.ses.meta(CONN_CTX_META_KEY);
+
+                            assert ctx != null;
+                            assert ctx.tcpClientFut != null;
+
+                            GridTcpNioCommunicationClient client =
+                                    new GridTcpNioCommunicationClient(ctx.connIdx, sesInfo.ses, log);
+
+                            ctx.tcpClientFut.onDone(client);
+
+                            break;
 
-                if (disconnectData != null)
-                    processDisconnect(disconnectData);
+                        case CLOSE:
+                            ctx = sesInfo.ses.meta(CONN_CTX_META_KEY);
+
+                            nioSrvr.close(sesInfo.ses);
+
+                            if (ctx != null && (clientFut = ctx.tcpClientFut) != null) {
+                                if (sesInfo.err != null)
+                                    clientFut.onError(sesInfo.err);
+                                else
+                                    clientFut.onDone();
+                            }
+
+                            break;
+                    }
+                }
                 else
                     processIdle();
             }
@@ -3753,51 +4105,73 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         /**
          * @param sesInfo Disconnected session information.
          */
-        private void processDisconnect(DisconnectedSessionInfo sesInfo) {
-            GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
+        private void processDisconnect(final SessionInfo sesInfo) {
+            assert sesInfo.state == SessionState.RECONNECT : sesInfo.state;
+
+            final GridNioRecoveryDescriptor recoveryDesc = sesInfo.ses.outRecoveryDescriptor();
 
-            ClusterNode node = recoveryDesc.node();
+            assert recoveryDesc != null;
+
+            final ClusterNode node = recoveryDesc.node();
 
             if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
                 return;
 
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+            if (log.isDebugEnabled())
+                log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
 
-                GridCommunicationClient client = reserveClient(node, sesInfo.connIdx);
+            int connIdx = sesInfo.connIdx;
 
+            final GridCommunicationClient client = nodeClient(node.id(), connIdx);
+
+            if (client != null && client.reserve())
                 client.release();
-            }
-            catch (IgniteCheckedException | IgniteException e) {
-                try {
-                    if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
-                        if (log.isDebugEnabled())
-                            log.debug("Recovery reconnect failed, will retry " +
-                                "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+            else {
+                if (client != null)
+                    clients.remove(node.id(), client);
 
-                        addProcessDisconnectRequest(sesInfo);
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Recovery reconnect failed, " +
-                                "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+                final IgniteInternalFuture<GridCommunicationClient> fut = reserveClient(node, connIdx);
+
+                fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+                    @Override public void apply(IgniteInternalF

<TRUNCATED>

[3/3] ignite git commit: ignite-4003 Async outgoing connections for communication SPI

Posted by ag...@apache.org.
ignite-4003 Async outgoing connections for communication SPI


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99e24a6c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99e24a6c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99e24a6c

Branch: refs/heads/ignite-4003
Commit: 99e24a6c3b69e13f27ab9259c0981dbbafa86e04
Parents: 3ef7a0e
Author: agura <ag...@apache.org>
Authored: Tue Feb 7 14:45:57 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Mon Feb 13 15:48:33 2017 +0300

----------------------------------------------------------------------
 .../GridClientConnectionManagerAdapter.java     |    1 -
 .../connection/GridClientNioTcpConnection.java  |    2 +-
 .../managers/communication/GridIoManager.java   |    4 +
 .../internal/util/GridSpinReadWriteLock.java    |    2 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |    2 +-
 .../ignite/internal/util/nio/GridNioServer.java |  221 ++-
 .../util/nio/GridSelectorNioSessionImpl.java    |    2 -
 .../internal/util/nio/ssl/GridNioSslFilter.java |   12 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 1851 ++++++++++++------
 .../IgniteCacheMessageWriteTimeoutTest.java     |    4 +-
 .../internal/util/nio/GridNioSelfTest.java      |    2 +-
 .../spi/GridTcpSpiForwardingSelfTest.java       |    3 +-
 .../GridAbstractCommunicationSelfTest.java      |   27 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   28 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   51 +-
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   49 +-
 ...CommunicationRecoveryAckClosureSelfTest.java |   36 +-
 .../tcp/TcpCommunicationSpiDropNodesTest.java   |   11 +-
 .../TcpCommunicationSpiFaultyClientTest.java    |    4 +-
 .../HadoopExternalCommunication.java            |    5 +-
 20 files changed, 1629 insertions(+), 688 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index f714e7a..e9d0340 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
                     GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
 
                     sslFilter.directMode(false);
-                    sslFilter.clientMode(true);
 
                     filters = new GridNioFilter[]{codecFilter, sslFilter};
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 8937504..215c697 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -234,7 +234,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
                 meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
             }
 
-            ses = (GridNioSession)srv.createSession(ch, meta).get();
+            ses = (GridNioSession)srv.createSession(ch, meta, false, null).get();
 
             if (sslHandshakeFut != null)
                 sslHandshakeFut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 7ef7bc0..5d17024 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -1281,6 +1282,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
                 else
                     getSpi().sendMessage(node, ioMsg);
+
+                if (ctx.discovery().node(node.id()) == null)
+                    throw new ClusterTopologyCheckedException("Failed to send message to node, node left: " + node);
             }
             catch (IgniteSpiException e) {
                 throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 4f23979..8fef887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -70,7 +70,7 @@ public class GridSpinReadWriteLock {
     private int writeLockEntryCnt;
 
     /**
-     * Acquires write lock.
+     * Acquires read lock.
      */
     @SuppressWarnings("BusyWait")
     public void readLock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 6258c13..af7b757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -259,7 +259,7 @@ public class GridNioRecoveryDescriptor {
 
     /**
      * @param node Node.
-     * @return {@code True} if node is not null and has the same order as initial remtoe node.
+     * @return {@code True} if node is not null and has the same order as initial remote node.
      */
     public boolean nodeAlive(@Nullable ClusterNode node) {
         return node != null && node.order() == this.node.order();

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a59adba..b8f367d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -111,6 +111,12 @@ public class GridNioServer<T> {
     /** SSL write buf limit. */
     private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Session future meta key. */
+    private static final int SESSION_FUT_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Selection key meta key. */
+    private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
     /** */
     private static final boolean DISABLE_KEYSET_OPTIMIZATION =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -463,7 +469,7 @@ public class GridNioServer<T> {
      * @return Future for operation.
      */
     public GridNioFuture<Boolean> close(GridNioSession ses) {
-        assert ses instanceof GridSelectorNioSessionImpl;
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
@@ -701,6 +707,7 @@ public class GridNioServer<T> {
     /**
      *
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public void dumpStats() {
         U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
             ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
@@ -714,17 +721,34 @@ public class GridNioServer<T> {
      *
      * @param ch Channel to register within the server and create session for.
      * @param meta Optional meta for new session.
+     * @param async Async connection.
+     * @param lsnr Listener that should be invoked in NIO thread.
      * @return Future to get session.
      */
-    public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
-        @Nullable Map<Integer, ?> meta) {
+    public GridNioFuture<GridNioSession> createSession(
+        final SocketChannel ch,
+        @Nullable Map<Integer, Object> meta,
+        boolean async,
+        @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr
+    ) {
         try {
             if (!closed) {
                 ch.configureBlocking(false);
 
                 NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
 
-                offerBalanced(req);
+                if (async) {
+                    assert meta != null;
+
+                    req.op = NioOperation.CONNECT;
+
+                    meta.put(SESSION_FUT_META_KEY, req);
+                }
+
+                if (lsnr != null)
+                    req.listen(lsnr);
+
+                offerBalanced(req, meta);
 
                 return req;
             }
@@ -738,6 +762,29 @@ public class GridNioServer<T> {
     }
 
     /**
+     * @param ch Channel.
+     * @param meta Session meta.
+     */
+    public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) {
+        if (!closed) {
+            NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+
+            req.op = NioOperation.CANCEL_CONNECT;
+
+            Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY);
+
+            assert idx != null : meta;
+
+            clientWorkers.get(idx).offer(req);
+
+            return req;
+        }
+        else
+            return new GridNioFinishedFuture<>(
+                new IgniteCheckedException("Failed to cancel connection, server is stopped."));
+    }
+
+    /**
      * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}.
      *
      * @return Write timeout in milliseconds.
@@ -823,9 +870,11 @@ public class GridNioServer<T> {
 
     /**
      * @param req Request to balance.
+     * @param meta Session metadata.
+     * @return Worker index.
      */
-    private synchronized void offerBalanced(NioOperationFuture req) {
-        assert req.operation() == NioOperation.REGISTER : req;
+    private synchronized int offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) {
+        assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT: req;
         assert req.socketChannel() != null : req;
 
         int workers = clientWorkers.size();
@@ -863,7 +912,12 @@ public class GridNioServer<T> {
         else
             balanceIdx = 0;
 
+        if (meta != null)
+            meta.put(WORKER_IDX_META_KEY, balanceIdx);
+
         clientWorkers.get(balanceIdx).offer(req);
+
+        return balanceIdx;
     }
 
     /** {@inheritDoc} */
@@ -1687,6 +1741,38 @@ public class GridNioServer<T> {
 
                     while ((req0 = changeReqs.poll()) != null) {
                         switch (req0.operation()) {
+                            case CONNECT: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
+                                SocketChannel ch = req.socketChannel();
+
+                                try {
+                                    ch.register(selector, SelectionKey.OP_CONNECT, req.meta());
+                                }
+                                catch (IOException e) {
+                                    req.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
+                                }
+
+                                break;
+                            }
+
+                            case CANCEL_CONNECT: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
+                                SocketChannel ch = req.socketChannel();
+
+                                SelectionKey key = ch.keyFor(selector);
+
+                                if (key != null)
+                                    key.cancel();
+
+                                U.closeQuiet(ch);
+
+                                req.onDone();
+
+                                break;
+                            }
+
                             case REGISTER: {
                                 register((NioOperationFuture)req0);
 
@@ -1893,8 +1979,12 @@ public class GridNioServer<T> {
                         log.debug("Closing all connected client sockets.");
 
                     // Close all channels registered with selector.
-                    for (SelectionKey key : selector.keys())
-                        close((GridSelectorNioSessionImpl)key.attachment(), null);
+                    for (SelectionKey key : selector.keys()) {
+                        Object attach = key.attachment();
+
+                        if (attach instanceof GridSelectorNioSessionImpl)
+                            close((GridSelectorNioSessionImpl)attach, null);
+                    }
 
                     if (log.isDebugEnabled())
                         log.debug("Closing NIO selector.");
@@ -2017,11 +2107,19 @@ public class GridNioServer<T> {
                 if (!key.isValid())
                     continue;
 
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                assert ses != null;
+                GridSelectorNioSessionImpl ses = null;
 
                 try {
+                    if (key.isConnectable()) {
+                        processConnect(key);
+
+                        continue;
+                    }
+
+                    ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                    assert ses != null;
+
                     if (key.isReadable())
                         processRead(key);
 
@@ -2033,9 +2131,11 @@ public class GridNioServer<T> {
                     throw e;
                 }
                 catch (Exception e) {
-                    U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                    if (!closed)
+                        U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
 
-                    close(ses, new GridNioException(e));
+                    if (ses != null)
+                        close(ses, new GridNioException(e));
                 }
             }
         }
@@ -2062,11 +2162,19 @@ public class GridNioServer<T> {
                 if (!key.isValid())
                     continue;
 
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                assert ses != null;
+                GridSelectorNioSessionImpl ses = null;
 
                 try {
+                    if (key.isConnectable()) {
+                        processConnect(key);
+
+                        continue;
+                    }
+
+                    ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                    assert ses != null;
+
                     if (key.isReadable())
                         processRead(key);
 
@@ -2079,9 +2187,10 @@ public class GridNioServer<T> {
                 }
                 catch (Exception e) {
                     if (!closed)
-                        U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                        U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
 
-                    close(ses, new GridNioException(e));
+                    if (ses != null)
+                        close(ses, new GridNioException(e));
                 }
             }
         }
@@ -2095,7 +2204,12 @@ public class GridNioServer<T> {
             long now = U.currentTimeMillis();
 
             for (SelectionKey key : keys) {
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+                Object obj = key.attachment();
+
+                if (!(obj instanceof GridSelectorNioSessionImpl))
+                    continue;
+
+                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)obj;
 
                 try {
                     long writeTimeout0 = writeTimeout;
@@ -2176,12 +2290,32 @@ public class GridNioServer<T> {
                         ses.addMeta(e.getKey(), e.getValue());
                 }
 
-                SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+                SelectionKey key;
 
-                ses.key(key);
+                if (!sockCh.isRegistered())
+                    key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+                else {
+                    key = sockCh.keyFor(selector);
+
+                    Map<Integer, Object> m = (Map<Integer, Object>)key.attachment();
+
+                    NioOperationFuture<GridNioSession> fut =
+                        (NioOperationFuture<GridNioSession>)m.remove(SESSION_FUT_META_KEY);
+
+                    assert fut != null;
+
+                    for (Entry<Integer, Object> e : m.entrySet())
+                        ses.addMeta(e.getKey(), e.getValue());
+
+                    key.attach(ses);
+
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
 
-                if (!ses.accepted())
-                    resend(ses);
+                    fut.onDone(ses);
+                }
+
+                ses.key(key);
 
                 sessions.add(ses);
                 workerSessions.add(ses);
@@ -2316,6 +2450,34 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param key Key.
+         * @throws IOException If failed.
+         */
+        @SuppressWarnings("unchecked")
+        private void processConnect(SelectionKey key) throws IOException {
+            SocketChannel ch = (SocketChannel)key.channel();
+
+            Map<Integer, Object> meta = (Map<Integer, Object>)key.attachment();
+
+            try {
+                if (ch.finishConnect())
+                    register(new NioOperationFuture<GridNioSession>(ch, false, meta));
+            }
+            catch (IOException e) {
+                NioOperationFuture<GridNioSession> sesFut =
+                    (NioOperationFuture<GridNioSession>)meta.get(SESSION_FUT_META_KEY);
+
+                assert sesFut != null;
+
+                U.closeQuiet(ch);
+
+                sesFut.onDone(new GridNioException("Failed to connect to node", e));
+
+                throw e;
+            }
+        }
+
+        /**
          * Processes read-available event on the key.
          *
          * @param key Key that is ready to be read.
@@ -2530,14 +2692,20 @@ public class GridNioServer<T> {
          * @param sockCh Socket channel to be registered on one of the selectors.
          */
         private void addRegistrationReq(SocketChannel sockCh) {
-            offerBalanced(new NioOperationFuture(sockCh));
+            offerBalanced(new NioOperationFuture(sockCh), null);
         }
     }
 
     /**
      * Asynchronous operation that may be requested on selector.
      */
-    enum NioOperation {
+    private enum NioOperation {
+        /** Register connect key selection. */
+        CONNECT,
+
+        /** Cancel connect. */
+        CANCEL_CONNECT,
+
         /** Register read key selection. */
         REGISTER,
 
@@ -2844,8 +3012,7 @@ public class GridNioServer<T> {
          * @param commMsg Direct message.
          * @param skipRecovery Skip recovery flag.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
-            Message commMsg, boolean skipRecovery) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Message commMsg, boolean skipRecovery) {
             assert ses != null;
             assert op != null;
             assert op != NioOperation.REGISTER;

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 66f9176..2280321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -393,8 +393,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
             if (!outRecovery.pairedConnections())
                 inRecovery = outRecovery;
 
-            outRecovery.onConnected();
-
             return null;
         }
         else

http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index 8ed7db0..2a1969f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -68,9 +68,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     /** Allocate direct buffer or heap buffer. */
     private boolean directBuf;
 
-    /** Whether SSLEngine should use client mode. */
-    private boolean clientMode;
-
     /** Whether direct mode is used. */
     private boolean directMode;
 
@@ -92,13 +89,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     }
 
     /**
-     * @param clientMode Flag indicating whether SSLEngine should use client mode..
-     */
-    public void clientMode(boolean clientMode) {
-        this.clientMode = clientMode;
-    }
-
-    /**
      *
      * @param directMode Flag indicating whether direct mode is used.
      */
@@ -163,6 +153,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
         if (sslMeta == null) {
             engine = sslCtx.createSSLEngine();
 
+            boolean clientMode = !ses.accepted();
+
             engine.setUseClientMode(clientMode);
 
             if (!clientMode) {