You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:40 UTC

[32/50] [abbrv] ignite git commit: zk

zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ed492a4f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ed492a4f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ed492a4f

Branch: refs/heads/ignite-zk
Commit: ed492a4f3aca046b6e196420bd986d0f47232aed
Parents: 78a994a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 15 16:00:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 15 17:08:26 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  17 -
 .../communication/tcp/TcpCommunicationSpi.java  | 148 ++-----
 .../tcp/internal/ConnectionKey.java             |  92 ++++
 .../TcpCommunicationConnectionCheckFuture.java  | 423 +++++++++++++++++--
 ...pCommunicationNodeConnectionCheckFuture.java |  30 ++
 .../GridTcpCommunicationSpiAbstractTest.java    |  83 ++--
 6 files changed, 618 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a30c439..78cb7a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,8 +117,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
-import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
-import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -186,11 +184,6 @@ public class GridIoMessageFactory implements MessageFactory {
         switch (type) {
             // -54 is reserved for SQL.
             // -46 ... -51 - snapshot messages.
-            case -62:
-                msg = new TcpCommunicationConnectionCheckMessage();
-
-                break;
-
             case -61:
                 msg = new IgniteDiagnosticMessage();
 
@@ -886,16 +879,6 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            case 129:
-                msg = new ClusterMetricsUpdateMessage();
-
-                break;
-
-            case 130:
-                msg = new ContinuousRoutineStartResultMessage();
-
-                break;
-
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 3588a79..afa6953 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.ipc.IpcEndpoint;
 import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
 import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
@@ -82,7 +83,6 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
 import org.apache.ignite.internal.util.nio.GridDirectParser;
 import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
 import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
 import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
@@ -135,11 +135,13 @@ import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
+import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
+import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
 import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
 import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
-import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
@@ -149,6 +151,8 @@ import org.jsr166.ConcurrentLinkedDeque8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
+import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.CONN_CHECK_DUMMY_KEY;
+import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
 import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
 import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
 import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
@@ -318,12 +322,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** Message tracker meta for session. */
     private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
 
-    /** Session future. */
-    public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
-
-    /** */
-    public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
-
     /**
      * Default local port range (value is <tt>100</tt>).
      * See {@link #setLocalPortRange(int)} for details.
@@ -409,17 +407,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 else {
                     ConnectionKey connId = ses.meta(CONN_IDX_META);
 
-                    if (connId != CONN_CHECK_DUMMY_KEY) {
-                        if (log.isInfoEnabled())
-                            log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
-                                ", rmtAddr=" + ses.remoteAddress() + ']');
-                    }
+                    if (log.isInfoEnabled())
+                        log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
+                            ", rmtAddr=" + ses.remoteAddress() + ']');
                 }
             }
 
             @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
                 ConnectionKey connId = ses.meta(CONN_IDX_META);
 
+                if (connId == CONN_CHECK_DUMMY_KEY)
+                    return;
+
                 if (connId != null) {
                     UUID id = connId.nodeId();
 
@@ -691,7 +690,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
                 if (connKey == null) {
-                    assert ses.accepted() : msg;
+                    assert ses.accepted() : ses;
 
                     if (!connectGate.tryEnter()) {
                         if (log.isDebugEnabled())
@@ -714,9 +713,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     }
                 }
                 else {
-                    metricsLsnr.onMessageReceived(msg, connKey.nodeId());
-
                     if (msg instanceof RecoveryLastReceivedMessage) {
+                        metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+
                         GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
 
                         if (recovery != null) {
@@ -729,9 +728,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             }
 
                             recovery.ackReceived(msg0.received());
-
-                            return;
                         }
+
+                        return;
                     }
                     else {
                         GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
@@ -754,9 +753,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         else if (connKey == CONN_CHECK_DUMMY_KEY) {
                             assert msg instanceof NodeIdMessage : msg;
 
-                            TcpCommunicationConnectionCheckFuture fut = ses.meta(SES_FUT_META);
+                            TcpCommunicationNodeConnectionCheckFuture fut = ses.meta(SES_FUT_META);
+
+                            assert fut != null : msg;
 
-                            fut.onConnected(U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0));
+                            fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));
 
                             nioSrvr.closeFromWorkerThread(ses);
 
@@ -764,6 +765,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         }
                     }
 
+                    metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+
                     IgniteRunnable c;
 
                     if (msgQueueLimit > 0) {
@@ -2592,18 +2595,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         sendMessage0(node, msg, null);
     }
 
+    /** {@inheritDoc} */
     public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
-        ClusterNode node = nodes.get(0);
-
-        try {
-            Collection<InetSocketAddress> addrs = nodeAddresses(node);
+        TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture(
+            this,
+            log.getLogger(TcpCommunicationConnectionCheckFuture.class),
+            nioSrvr,
+            nodes);
 
-        }
-        catch (Exception e) {
-            throw new IgniteSpiException(e);
-        }
+        fut.init(failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout);
 
-        return null;
+        return new IgniteFutureImpl<>(fut);
     }
 
     /**
@@ -3028,7 +3030,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             ConnectionKey id = ses.meta(CONN_IDX_META);
 
             if (id != null) {
-                ClusterNode node = getSpiContext().node(id.nodeId);
+                ClusterNode node = getSpiContext().node(id.nodeId());
 
                 if (node != null && node.isClient()) {
                     String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -3049,9 +3051,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /**
      * @param node Node.
      * @return Node addresses.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Collection<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException {
+        return nodeAddresses(node, filterReachableAddresses);
+    }
+
+    /**
+     * @param node Node.
+     * @param filterReachableAddresses Filter addresses flag.
+     * @return Node addresses.
      * @throws IgniteCheckedException If node does not have addresses.
      */
-    private LinkedHashSet<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException {
+    public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddresses)
+        throws IgniteCheckedException {
         Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
         Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
         Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -3132,7 +3145,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @throws IgniteCheckedException If failed.
      */
     protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
-        LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
+        Collection<InetSocketAddress> addrs = nodeAddresses(node);
 
         GridCommunicationClient client = null;
         IgniteCheckedException errs = null;
@@ -4629,77 +4642,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /**
      *
      */
-    private static class ConnectionKey {
-        /** */
-        private final UUID nodeId;
-
-        /** */
-        private final int idx;
-
-        /** */
-        private final long connCnt;
-
-        /**
-         * @param nodeId Node ID.
-         * @param idx Connection index.
-         * @param connCnt Connection counter (set only for incoming connections).
-         */
-        ConnectionKey(UUID nodeId, int idx, long connCnt) {
-            this.nodeId = nodeId;
-            this.idx = idx;
-            this.connCnt = connCnt;
-        }
-
-        /**
-         * @return Connection counter.
-         */
-        long connectCount() {
-            return connCnt;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        UUID nodeId() {
-            return nodeId;
-        }
-
-        /**
-         * @return Connection index.
-         */
-        int connectionIndex() {
-            return idx;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            ConnectionKey key = (ConnectionKey) o;
-
-            return idx == key.idx && nodeId.equals(key.nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-            res = 31 * res + idx;
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ConnectionKey.class, this);
-        }
-    }
-
-    /**
-     *
-     */
     interface ConnectionPolicy {
         /**
          * @return Thread connection index.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
new file mode 100644
index 0000000..6716446
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class ConnectionKey {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private final int idx;
+
+    /** */
+    private final long connCnt;
+
+    /**
+     * @param nodeId Node ID.
+     * @param idx Connection index.
+     * @param connCnt Connection counter (set only for incoming connections).
+     */
+    public ConnectionKey(UUID nodeId, int idx, long connCnt) {
+        this.nodeId = nodeId;
+        this.idx = idx;
+        this.connCnt = connCnt;
+    }
+
+    /**
+     * @return Connection counter.
+     */
+    public long connectCount() {
+        return connCnt;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Connection index.
+     */
+    public int connectionIndex() {
+        return idx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConnectionKey key = (ConnectionKey) o;
+
+        return idx == key.idx && nodeId.equals(key.nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = nodeId.hashCode();
+        res = 31 * res + idx;
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ConnectionKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index e08ea13..99e1eca 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -17,84 +17,445 @@
 
 package org.apache.ignite.spi.communication.tcp.internal;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Boolean> {
+public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject {
+    /** Session future. */
+    public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** */
+    public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
+
+    /** */
+    private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
+
+    /** */
+    private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
+
+    /** */
+    private final AtomicInteger resCntr = new AtomicInteger();
+
     /** */
-    private final UUID nodeId;
+    private final List<ClusterNode> nodes;
+
+    /** */
+    private volatile ConnectFuture[] futs;
 
     /** */
     private final GridNioServer nioSrvr;
 
     /** */
-    private Map<Integer, Object> sesMeta;
+    private final TcpCommunicationSpi spi;
+
+    /** */
+    private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
+
+    /** */
+    private final BitSet resBitSet;
+
+    /** */
+    private long endTime;
 
     /** */
-    private SocketChannel ch;
+    private final IgniteLogger log;
 
     /**
-     * @param nodeId Remote note ID.
+     * @param spi SPI instance.
+     * @param log Logger.
+     * @param nioSrvr NIO server.
+     * @param nodes Nodes to check.
      */
-    public TcpCommunicationConnectionCheckFuture(GridNioServer nioSrvr, UUID nodeId) {
+    public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
+        IgniteLogger log,
+        GridNioServer nioSrvr,
+        List<ClusterNode> nodes)
+    {
+        this.spi = spi;
+        this.log = log;
         this.nioSrvr = nioSrvr;
-        this.nodeId = nodeId;
+        this.nodes = nodes;
+
+        resBitSet = new BitSet(nodes.size());
     }
 
     /**
-     * @param addr
-     * @throws IOException
+     * @param timeout Connect timeout.
      */
-    public void init(InetSocketAddress addr) throws IOException {
-        ch = SocketChannel.open();
+    public void init(long timeout) {
+        ConnectFuture[] futs = new ConnectFuture[nodes.size()];
+
+        UUID locId = spi.getSpiContext().localNode().id();
+
+        for (int i = 0; i < nodes.size(); i++) {
+            ClusterNode node = nodes.get(i);
+
+            if (!node.id().equals(locId)) {
+                if (spi.getSpiContext().node(node.id()) == null) {
+                    receivedConnectionStatus(i, false);
+
+                    continue;
+                }
+
+                Collection<InetSocketAddress> addrs;
+
+                try {
+                    addrs = spi.nodeAddresses(node, false);
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to get node addresses: " + node, e);
 
-        ch.configureBlocking(false);
+                    receivedConnectionStatus(i, false);
 
-        ch.socket().setTcpNoDelay(true);
-        ch.socket().setKeepAlive(false);
+                    continue;
+                }
 
-        boolean connect = ch.connect(addr);
+                if (addrs.size() == 1) {
+                    SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
 
-        if (!connect) {
-            sesMeta = new GridLeanMap<>(2);
+                    fut.init(addrs.iterator().next());
 
-            sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, TcpCommunicationSpi.CONN_CHECK_DUMMY_KEY);
-            sesMeta.put(TcpCommunicationSpi.SES_FUT_META, this);
+                    futs[i] = fut;
+                }
+                else {
+                    MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
 
-            nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
-                @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
-                    if (fut.error() != null)
-                        onDone(false);
+                    fut.init(addrs);
                 }
-            });
+            }
+            else
+                receivedConnectionStatus(i, true);
+        }
+
+        this.futs = futs;
+
+        if (!isDone()) {
+            endTime = System.currentTimeMillis() - timeout;
+
+            spi.getSpiContext().addTimeoutObject(this);
         }
     }
 
     /**
+     * @param idx Node index.
+     * @param res Success flag.
+     */
+    private void receivedConnectionStatus(int idx, boolean res) {
+        assert resCntr.get() < nodes.size();
+
+        synchronized (resBitSet) {
+            resBitSet.set(idx, res);
+        }
+
+        if (resCntr.incrementAndGet() == nodes.size())
+            onDone(resBitSet);
+    }
+
+    /**
+     * @param nodeIdx Node index.
+     * @return Node ID.
+     */
+    private UUID nodeId(int nodeIdx) {
+        return nodes.get(nodeIdx).id();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return timeoutObjId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return endTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (!isDone())
+            return;
+
+        ConnectFuture[] futs = this.futs;
+
+        for (int i = 0; i < futs.length; i++) {
+            ConnectFuture fut = futs[i];
+
+            if (fut != null)
+                fut.onTimeout();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
+        if (super.onDone(res, err)) {
+            spi.getSpiContext().removeTimeoutObject(this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
      *
      */
-    public void onTimeout() {
-        if (super.onDone(false))
-            nioSrvr.cancelConnect(ch, sesMeta);
+    private interface ConnectFuture {
+        /**
+         *
+         */
+        void onTimeout();
     }
 
     /**
-     * @param rmtNodeId
+     *
      */
-    public void onConnected(UUID rmtNodeId) {
-        onDone(nodeId.equals(rmtNodeId));
+    private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
+        /** */
+        final int nodeIdx;
+
+        /** */
+        volatile int done;
+
+        /** */
+        Map<Integer, Object> sesMeta;
+
+        /** */
+        private SocketChannel ch;
+
+        /**
+         * @param nodeIdx Node index.
+         */
+        SingleAddressConnectFuture(int nodeIdx) {
+            this.nodeIdx = nodeIdx;
+        }
+
+        /**
+         * @param addr Node address.
+         */
+        public void init(InetSocketAddress addr) {
+            boolean connect;
+
+            try {
+                ch = SocketChannel.open();
+
+                ch.configureBlocking(false);
+
+                ch.socket().setTcpNoDelay(true);
+                ch.socket().setKeepAlive(false);
+
+                connect = ch.connect(addr);
+            }
+            catch (Exception e) {
+                finish(false);
+
+                return;
+            }
+
+            if (!connect) {
+                sesMeta = new GridLeanMap<>(3);
+
+                // Set dummy key to identify connection-check outgoing connection.
+                sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, CONN_CHECK_DUMMY_KEY);
+                sesMeta.put(SES_FUT_META, this);
+
+                nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+                    @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
+                        if (fut.error() != null)
+                            finish(false);
+                    }
+                });
+            }
+        }
+
+        /**
+         *
+         */
+        void cancel() {
+            if (finish(false))
+                nioSrvr.cancelConnect(ch, sesMeta);
+        }
+
+        /** {@inheritDoc} */
+        public void onTimeout() {
+            cancel();
+        }
+
+        /** {@inheritDoc} */
+        public void onConnected(UUID rmtNodeId) {
+            finish(nodeId(nodeIdx).equals(rmtNodeId));
+        }
+
+        /**
+         * @param res Result.
+         * @return {@code True} if result was set by this call.
+         */
+        boolean finish(boolean res) {
+            if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
+                onStatusReceived(res);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @param res Result.
+         */
+        void onStatusReceived(boolean res) {
+            receivedConnectionStatus(nodeIdx, res);
+        }
+    }
+
+    /**
+     *
+     */
+    private class MultipleAddressesConnectFuture implements ConnectFuture {
+        /** */
+        volatile int resCnt;
+
+        /** */
+        volatile SingleAddressConnectFuture[] futs;
+
+        /** */
+        final int nodeIdx;
+
+        /**
+         * @param nodeIdx Node index.
+         */
+        MultipleAddressesConnectFuture(int nodeIdx) {
+            this.nodeIdx = nodeIdx;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            SingleAddressConnectFuture[] futs = this.futs;
+
+            for (int i = 0; i < futs.length; i++) {
+                ConnectFuture fut = futs[i];
+
+                if (fut != null)
+                    fut.onTimeout();
+            }
+        }
+
+        /**
+         * @param addrs Node addresses.
+         */
+        void init(Collection<InetSocketAddress> addrs) {
+            SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
+
+            int idx = 0;
+
+            for (InetSocketAddress addr : addrs) {
+                SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
+                    @Override void onStatusReceived(boolean res) {
+                        receivedAddressStatus(res);
+                    }
+                };
+
+                fut.init(addr);
+
+                futs[idx++] = fut;
+
+                if (done())
+                    return;
+            }
+
+            this.futs = futs;
+
+            // Close race.
+            if (done())
+                cancelFutures();
+        }
+
+        /**
+         * @return {@code True}
+         */
+        private boolean done() {
+            int resCnt0 = resCnt;
+
+            return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
+        }
+
+        /**
+         *
+         */
+        private void cancelFutures() {
+            SingleAddressConnectFuture[] futs = this.futs;
+
+            if (futs != null) {
+                for (int i = 0; i < futs.length; i++) {
+                    SingleAddressConnectFuture fut = futs[i];
+
+                    fut.cancel();
+                }
+            }
+        }
+
+        /**
+         * @param res Result.
+         */
+        void receivedAddressStatus(boolean res) {
+            if (res) {
+                for (;;) {
+                    int resCnt0 = resCnt;
+
+                    if (resCnt0 == Integer.MAX_VALUE)
+                        return;
+
+                    if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
+                        receivedConnectionStatus(nodeIdx, true);
+
+                        cancelFutures(); // Cancel others connects if they are still in progress.
+
+                        return;
+                    }
+                }
+            }
+            else {
+                for (;;) {
+                    int resCnt0 = resCnt;
+
+                    if (resCnt0 == Integer.MAX_VALUE)
+                        return;
+
+                    int resCnt1 = resCnt0 + 1;
+
+                    if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
+                        if (resCnt1 == futs.length)
+                            receivedConnectionStatus(nodeIdx, false);
+
+                        return;
+                    }
+                }
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
new file mode 100644
index 0000000..c034782
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+public interface TcpCommunicationNodeConnectionCheckFuture {
+    /**
+     * @param nodeId Remote node ID.
+     */
+    public void onConnected(UUID nodeId);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ed492a4f/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index caa2d87..e89a4c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -18,20 +18,23 @@
 package org.apache.ignite.spi.communication.tcp;
 
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest;
-import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -39,7 +42,7 @@ import org.apache.ignite.testframework.GridTestUtils;
  */
 abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> {
     /** */
-    private static final int SPI_COUNT = 2;
+    private static final int SPI_COUNT = 3;
 
     /** */
     public static final int IDLE_CONN_TIMEOUT = 2000;
@@ -92,33 +95,65 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         }
     }
 
-    public void testConnectionCheck() {
-        for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
-            UUID id = entry.getKey();
+    /**
+     *
+     */
+    public void testCheckConnection1() {
+        for (int i = 0; i < 100; i++) {
+            for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
+                TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
 
-            TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
+                List<ClusterNode> checkNodes = new ArrayList<>(nodes);
 
-            List<ClusterNode> checkNodes = new ArrayList<>();
+                assert checkNodes.size() > 1;
 
-            for (ClusterNode node : nodes) {
-                if (!id.equals(node.id()))
-                    checkNodes.add(node);
+                IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes);
+
+                BitSet res = fut.get();
+
+                for (int n = 0; n < checkNodes.size(); n++)
+                    assertTrue(res.get(n));
             }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCheckConnection2() throws Exception {
+        final int THREADS = spis.size();
+
+        final CyclicBarrier b = new CyclicBarrier(THREADS);
+
+        List<IgniteInternalFuture> futs = new ArrayList<>();
 
-            spi.checkConnection(checkNodes);
-
-            break;
-//            for (ClusterNode node : nodes) {
-//                synchronized (mux) {
-//                    if (!msgDestMap.containsKey(entry.getKey()))
-//                        msgDestMap.put(entry.getKey(), new HashSet<UUID>());
-//
-//                    msgDestMap.get(entry.getKey()).add(node.id());
-//                }
-//
-//                entry.getValue().sendMessage(node, new GridTestMessage(entry.getKey(), msgId++, 0));
-//            }
+        for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
+            final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
+
+            futs.add(GridTestUtils.runAsync(new Callable() {
+                @Override public Object call() throws Exception {
+                    List<ClusterNode> checkNodes = new ArrayList<>(nodes);
+
+                    assert checkNodes.size() > 1;
+
+                    b.await();
+
+                    for (int i = 0; i < 100; i++) {
+                        IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes);
+
+                        BitSet res = fut.get();
+
+                        for (int n = 0; n < checkNodes.size(); n++)
+                            assertTrue(res.get(n));
+                    }
+
+                    return null;
+                }
+            }));
         }
+
+        for (IgniteInternalFuture f : futs)
+            f.get();
     }
 
     /** {@inheritDoc} */