You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/24 14:33:03 UTC

[1/2] incubator-ignite git commit: master: back merge from ignite-752

Repository: incubator-ignite
Updated Branches:
  refs/heads/master ae148f1c5 -> cff25e91a


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index b7d6e3f..6130bd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -65,12 +65,20 @@ import java.util.concurrent.atomic.*;
  * and then this info goes to coordinator. When coordinator processes join request
  * and issues node added messages and all other nodes then receive info about new node.
  * <h1 class="header">Failure Detection</h1>
- * Configuration defaults (see Configuration section below for details)
- * are chosen to make possible for discovery SPI work reliably on
- * most of hardware and virtual deployments, but this has made failure detection time worse.
+ * Configuration defaults (see Configuration section below and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for discovery
+ * SPI work reliably on most of hardware and virtual deployments, but this has made failure detection time worse.
  * <p>
- * For stable low-latency networks the following more aggressive settings are recommended
- * (which allows failure detection time ~200ms):
+ * If it's needed to tune failure detection then it's highly recommended to do this using
+ * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
+ * following parameters: {@link #getSocketTimeout()}, {@link #getAckTimeout()}, {@link #getMaxAckTimeout()},
+ * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
+ * ignored.
+ * <p>
+ * If it's required to perform advanced settings of failure detection and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpDiscoverySpi}
+ * configuration parameters may be used. As an example, for stable low-latency networks the following more aggressive
+ * settings are recommended (which allows failure detection time ~200ms):
  * <ul>
  * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)}) - 100ms</li>
  * <li>Socket timeout (see {@link #setSocketTimeout(long)}) - 200ms</li>
@@ -157,6 +165,15 @@ import java.util.concurrent.atomic.*;
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
 public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+    /** Failure detection timeout feature major version. */
+    final static byte FAILURE_DETECTION_MAJOR_VER = 1;
+
+    /** Failure detection timeout feature minor version. */
+    final static byte FAILURE_DETECTION_MINOR_VER = 4;
+
+    /** Failure detection timeout feature maintainance version. */
+    final static byte FAILURE_DETECTION_MAINT_VER = 1;
+
     /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
 
@@ -221,10 +238,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     protected TcpDiscoveryIpFinder ipFinder;
 
     /** Socket operations timeout. */
-    protected long sockTimeout; // Must be initialized in the constructor of child class.
+    private long sockTimeout; // Must be initialized in the constructor of child class.
 
     /** Message acknowledgement timeout. */
-    protected long ackTimeout; // Must be initialized in the constructor of child class.
+    private long ackTimeout; // Must be initialized in the constructor of child class.
 
     /** Network timeout. */
     protected long netTimeout = DFLT_NETWORK_TIMEOUT;
@@ -286,14 +303,14 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** Reconnect attempts count. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    protected int reconCnt = DFLT_RECONNECT_CNT;
+    private int reconCnt = DFLT_RECONNECT_CNT;
 
     /** Statistics print frequency. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
     protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
 
     /** Maximum message acknowledgement timeout. */
-    protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
+    private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
 
     /** Max heartbeats count node can miss without initiating status check. */
     protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
@@ -521,6 +538,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * on every retry.
      * <p>
      * If not specified, default is {@link #DFLT_RECONNECT_CNT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param reconCnt Number of retries during message sending.
      * @see #setAckTimeout(long)
@@ -529,6 +548,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     public TcpDiscoverySpi setReconnectCount(int reconCnt) {
         this.reconCnt = reconCnt;
 
+        failureDetectionTimeoutEnabled(false);
+
         return this;
     }
 
@@ -548,6 +569,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}.
      * <p>
      * Affected server nodes only.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param maxAckTimeout Maximum acknowledgement timeout.
      */
@@ -555,6 +578,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) {
         this.maxAckTimeout = maxAckTimeout;
 
+        failureDetectionTimeoutEnabled(false);
+
         return this;
     }
 
@@ -701,7 +726,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /**
      * Sets IP finder for IP addresses sharing and storing.
      * <p>
-     * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
+     * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will
+     * be used by default.
      *
      * @param ipFinder IP finder.
      */
@@ -720,6 +746,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * significantly greater than the default (e.g. to {@code 30000}).
      * <p>
      * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param sockTimeout Socket connection timeout.
      */
@@ -727,6 +755,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     public TcpDiscoverySpi setSocketTimeout(long sockTimeout) {
         this.sockTimeout = sockTimeout;
 
+        failureDetectionTimeoutEnabled(false);
+
         return this;
     }
 
@@ -737,6 +767,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * and SPI tries to repeat message sending.
      * <p>
      * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param ackTimeout Acknowledgement timeout.
      */
@@ -744,6 +776,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     public TcpDiscoverySpi setAckTimeout(long ackTimeout) {
         this.ackTimeout = ackTimeout;
 
+        failureDetectionTimeoutEnabled(false);
+
         return this;
     }
 
@@ -1123,10 +1157,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /**
      * @param sockAddr Remote address.
+     * @param timeoutHelper Timeout helper.
      * @return Opened socket.
      * @throws IOException If failed.
      */
-    protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+    protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+        throws IOException, IgniteSpiOperationTimeoutException {
         assert sockAddr != null;
 
         InetSocketAddress resolved = sockAddr.isUnresolved() ?
@@ -1142,9 +1178,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         sock.setTcpNoDelay(true);
 
-        sock.connect(resolved, (int)sockTimeout);
+        sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
 
-        writeToSocket(sock, U.IGNITE_HEADER);
+        writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
 
         return sock;
     }
@@ -1154,14 +1190,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      *
      * @param sock Socket.
      * @param data Raw data to write.
+     * @param timeout Socket write timeout.
      * @throws IOException If IO failed or write timed out.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(Socket sock, byte[] data) throws IOException {
+    private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException {
         assert sock != null;
         assert data != null;
 
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
@@ -1197,11 +1234,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      *
      * @param sock Socket.
      * @param msg Message.
+     * @param timeout Socket write timeout.
      * @throws IOException If IO failed or write timed out.
      * @throws IgniteCheckedException If marshalling failed.
      */
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException {
-        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
+        throws IOException, IgniteCheckedException {
+        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K.
     }
 
     /**
@@ -1214,8 +1253,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @throws IgniteCheckedException If marshalling failed.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
-        throws IOException, IgniteCheckedException {
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout,
+        long timeout) throws IOException, IgniteCheckedException {
         assert sock != null;
         assert msg != null;
         assert bout != null;
@@ -1223,7 +1262,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         // Marshall message first to perform only write after.
         marsh.marshal(msg, bout);
 
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
@@ -1260,13 +1299,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @param msg Received message.
      * @param sock Socket.
      * @param res Integer response.
+     * @param timeout Socket timeout.
      * @throws IOException If IO failed or write timed out.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+    protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+        throws IOException {
         assert sock != null;
 
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
@@ -1307,7 +1348,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @throws IOException If IO failed or read timed out.
      * @throws IgniteCheckedException If unmarshalling failed.
      */
-    protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
+    protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException,
+        IgniteCheckedException {
         assert sock != null;
 
         int oldTimeout = sock.getSoTimeout();
@@ -1315,7 +1357,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         try {
             sock.setSoTimeout((int)timeout);
 
-            return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+            T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+
+            impl.onDataReceived();
+
+            return res;
         }
         catch (IOException | IgniteCheckedException e) {
             if (X.hasCause(e, SocketTimeoutException.class))
@@ -1356,6 +1402,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             if (res == -1)
                 throw new EOFException();
 
+            impl.onDataReceived();
+
             return res;
         }
         catch (SocketTimeoutException e) {
@@ -1570,6 +1618,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+        initFailureDetectionTimeout();
+
         if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
             if (ackTimeout == 0)
                 ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
@@ -1591,18 +1641,21 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             impl = new ServerImpl(this);
         }
 
+        if (!failureDetectionTimeoutEnabled()) {
+            assertParameter(sockTimeout > 0, "sockTimeout > 0");
+            assertParameter(ackTimeout > 0, "ackTimeout > 0");
+            assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
+            assertParameter(reconCnt > 0, "reconnectCnt > 0");
+        }
+
+        assertParameter(netTimeout > 0, "networkTimeout > 0");
         assertParameter(ipFinder != null, "ipFinder != null");
         assertParameter(hbFreq > 0, "heartbeatFreq > 0");
-        assertParameter(netTimeout > 0, "networkTimeout > 0");
-        assertParameter(sockTimeout > 0, "sockTimeout > 0");
-        assertParameter(ackTimeout > 0, "ackTimeout > 0");
 
         assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
         assertParameter(locPort > 1023, "localPort > 1023");
         assertParameter(locPortRange >= 0, "localPortRange >= 0");
         assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff");
-        assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
-        assertParameter(reconCnt > 0, "reconnectCnt > 0");
         assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
         assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0");
         assertParameter(threadPri > 0, "threadPri > 0");
@@ -1620,11 +1673,20 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             log.debug(configInfo("localPort", locPort));
             log.debug(configInfo("localPortRange", locPortRange));
             log.debug(configInfo("threadPri", threadPri));
-            log.debug(configInfo("networkTimeout", netTimeout));
-            log.debug(configInfo("sockTimeout", sockTimeout));
-            log.debug(configInfo("ackTimeout", ackTimeout));
-            log.debug(configInfo("maxAckTimeout", maxAckTimeout));
-            log.debug(configInfo("reconnectCount", reconCnt));
+
+            if (!failureDetectionTimeoutEnabled()) {
+                log.debug("Failure detection timeout is ignored because at least one of the parameters from this list" +
+                    " has been set explicitly: 'sockTimeout', 'ackTimeout', 'maxAckTimeout', 'reconnectCount'.");
+
+                log.debug(configInfo("networkTimeout", netTimeout));
+                log.debug(configInfo("sockTimeout", sockTimeout));
+                log.debug(configInfo("ackTimeout", ackTimeout));
+                log.debug(configInfo("maxAckTimeout", maxAckTimeout));
+                log.debug(configInfo("reconnectCount", reconCnt));
+            }
+            else
+                log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout()));
+
             log.debug(configInfo("ipFinder", ipFinder));
             log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
             log.debug(configInfo("heartbeatFreq", hbFreq));
@@ -1837,7 +1899,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
                 U.closeQuiet(sock);
 
                 LT.warn(log, null, "Socket write has timed out (consider increasing " +
-                    "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
+                    (failureDetectionTimeoutEnabled() ?
+                    "'IgniteConfiguration.failureDetectionTimeout' configuration property) [" +
+                    "failureDetectionTimeout=" + failureDetectionTimeout() + ']' :
+                    "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'));
 
                 stats.onSocketTimeout();
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 142dbea..44e9006 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -89,6 +89,9 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
     @GridToStringExclude
     private volatile long lastUpdateTime = U.currentTimeMillis();
 
+    /** The most recent time when a data chunk was received from a node. */
+    private volatile long lastDataReceivedTime = U.currentTimeMillis();
+
     /** Metrics provider (transient). */
     @GridToStringExclude
     private DiscoveryMetricsProvider metricsProvider;
@@ -390,6 +393,24 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
     }
 
     /**
+     * Gets the last time a node received a data chunk from a remote node.
+     *
+     * @return Time in milliseconds.
+     */
+    public long lastDataReceivedTime() {
+        return lastDataReceivedTime;
+    }
+
+    /**
+     * Sets the last time a node receive a data chunk from a remote node in a topology.
+     *
+     * @param lastDataReceivedTime Time in milliseconds.
+     */
+    public void lastDataReceivedTime(long lastDataReceivedTime) {
+        this.lastDataReceivedTime = lastDataReceivedTime;
+    }
+
+    /**
      * Gets visible flag.
      *
      * @return {@code true} if node is in visible state.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
new file mode 100644
index 0000000..c7e99c8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+
+import java.io.*;
+
+/**
+ * Message used to check whether a node is still connected to the topology.
+ * The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node
+ * which directly replies to the sender without message re-translation to the coordinator.
+ */
+public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Default no-arg constructor for {@link Externalizable} interface.
+     */
+    public TcpDiscoveryConnectionCheckMessage() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param creatorNode Node created this message.
+     */
+    public TcpDiscoveryConnectionCheckMessage(TcpDiscoveryNode creatorNode) {
+        super(creatorNode.id());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        // This method has been left empty intentionally to keep message size at min.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        // This method has been left empty intentionally to keep message size at min.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryConnectionCheckMessage.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index fbaea11..7247d54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -281,7 +281,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         volatile CountDownLatch writeLatch;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
             throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                 CountDownLatch writeLatch0 = writeLatch;
@@ -293,7 +293,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
                 }
             }
 
-            super.writeToSocket(sock, msg);
+            super.writeToSocket(sock, msg, timeout);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index eee38a5..538ead5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -79,7 +79,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         for (CommunicationSpi spi : spis.values()) {
             ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
-            assertEquals(2, clients.size());
+            assertEquals(getSpiCount() - 1, clients.size());
 
             clients.put(UUID.randomUUID(), F.first(clients.values()));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 1a4ba22..b4090d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -166,7 +166,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                                 @Override public boolean apply() {
                                     return recoveryDesc.messagesFutures().isEmpty();
                                 }
-                            }, 10_000);
+                            }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 :
+                                10_000);
 
                             assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
                                 recoveryDesc.messagesFutures().size());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
new file mode 100644
index 0000000..a6bfe00
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends GridTcpCommunicationSpiRecoverySelfTest {
+    /** {@inheritDoc} */
+    @Override protected TcpCommunicationSpi getSpi(int idx) {
+        TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+        spi.setSharedMemoryPort(-1);
+        spi.setLocalPort(port++);
+        spi.setIdleConnectionTimeout(10_000);
+        spi.setAckSendThreshold(5);
+        spi.setSocketSendBuffer(512);
+        spi.setSocketReceiveBuffer(512);
+
+        return spi;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long awaitForSocketWriteTimeout() {
+        return IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT + 5_000;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testFailureDetectionEnabled() throws Exception {
+        for (TcpCommunicationSpi spi: spis) {
+            assertTrue(spi.failureDetectionTimeoutEnabled());
+            assertTrue(spi.failureDetectionTimeout() == IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 5d3afd9..67d42d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -60,7 +60,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
     private static final int ITERS = 10;
 
     /** */
-    private static int port = 30_000;
+    protected static int port = 30_000;
 
     /**
      *
@@ -163,6 +163,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
     }
 
     /**
+     * Time to wait for socket write timeout.
+     *
+     * @return Timeout.
+     */
+    protected long awaitForSocketWriteTimeout() {
+        return 5000;
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testBlockListener() throws Exception {
@@ -245,7 +254,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
             @Override public boolean apply() {
                 return lsnr0.rcvCnt.get() >= expMsgs && lsnr1.rcvCnt.get() >= expMsgs;
             }
-        }, 5000);
+        }, awaitForSocketWriteTimeout());
 
         assertEquals(expMsgs, lsnr0.rcvCnt.get());
         assertEquals(expMsgs, lsnr1.rcvCnt.get());
@@ -301,7 +310,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         @Override public boolean apply() {
                             return ses0.closeTime() != 0;
                         }
-                    }, 5000);
+                    }, awaitForSocketWriteTimeout());
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
@@ -411,7 +420,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         @Override public boolean apply() {
                             return ses0.closeTime() != 0;
                         }
-                    }, 5000);
+                    }, awaitForSocketWriteTimeout());
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
@@ -423,7 +432,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         public boolean apply() {
                             return ses1.closeTime() != 0;
                         }
-                    }, 5000);
+                    }, awaitForSocketWriteTimeout());
 
                     assertTrue("Failed to wait for session close", ses1.closeTime() != 0);
 
@@ -528,7 +537,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         @Override public boolean apply() {
                             return ses0.closeTime() != 0;
                         }
-                    }, 5000);
+                    }, awaitForSocketWriteTimeout());
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
@@ -592,7 +601,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
                 return !sessions.isEmpty();
             }
-        }, 5000);
+        }, awaitForSocketWriteTimeout());
 
         Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
new file mode 100644
index 0000000..56873d1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.communication.*;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiTcpFailureDetectionSelfTest extends GridTcpCommunicationSpiTcpSelfTest {
+    /** */
+    private final static int SPI_COUNT = 4;
+
+    private TcpCommunicationSpi spis[] = new TcpCommunicationSpi[SPI_COUNT];
+
+    /** {@inheritDoc} */
+    @Override protected int getSpiCount() {
+        return SPI_COUNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CommunicationSpi getSpi(int idx) {
+        TcpCommunicationSpi spi = (TcpCommunicationSpi)super.getSpi(idx);
+
+        switch (idx) {
+            case 0:
+                // Ignore
+                break;
+            case 1:
+                spi.setConnectTimeout(4000);
+                break;
+            case 2:
+                spi.setMaxConnectTimeout(TcpCommunicationSpi.DFLT_MAX_CONN_TIMEOUT);
+                break;
+            case 3:
+                spi.setReconnectCount(2);
+                break;
+            default:
+                assert false;
+        }
+
+        spis[idx] = spi;
+
+        return spi;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testFailureDetectionEnabled() throws Exception {
+        assertTrue(spis[0].failureDetectionTimeoutEnabled());
+        assertTrue(spis[0].failureDetectionTimeout() == IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT);
+
+        for (int i = 1; i < SPI_COUNT; i++) {
+            assertFalse(spis[i].failureDetectionTimeoutEnabled());
+            assertEquals(0, spis[i].failureDetectionTimeout());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 61bb944..892d87d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -43,12 +43,18 @@ import static org.apache.ignite.lang.IgniteProductVersion.*;
 @SuppressWarnings({"JUnitAbstractTestClassNamingConvention"})
 public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends GridSpiAbstractTest<T> {
     /** */
-    private static final List<DiscoverySpi> spis = new ArrayList<>();
+    private static final String HTTP_ADAPTOR_MBEAN_NAME = "mbeanAdaptor:protocol=HTTP";
+
+    /** */
+    protected static final List<DiscoverySpi> spis = new ArrayList<>();
 
     /** */
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
 
     /** */
+    private static final List<HttpAdaptor> httpAdaptors = new ArrayList<>();
+
+    /** */
     private static long spiStartTime;
 
     /** */
@@ -424,10 +430,12 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
         adaptor.setPort(Integer.valueOf(GridTestProperties.getProperty("discovery.mbeanserver.selftest.baseport")) +
             idx);
 
-        srv.registerMBean(adaptor, new ObjectName("mbeanAdaptor:protocol=HTTP"));
+        srv.registerMBean(adaptor, new ObjectName(HTTP_ADAPTOR_MBEAN_NAME));
 
         adaptor.start();
 
+        httpAdaptors.add(adaptor);
+
         return srv;
     }
 
@@ -442,12 +450,21 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
             spi.spiStop();
         }
 
-        for (IgniteTestResources rscrs : spiRsrcs)
+        for (IgniteTestResources rscrs : spiRsrcs) {
+            MBeanServer mBeanServer = rscrs.getMBeanServer();
+
+            mBeanServer.unregisterMBean(new ObjectName(HTTP_ADAPTOR_MBEAN_NAME));
+
             rscrs.stopThreads();
+        }
+
+        for (HttpAdaptor adaptor : httpAdaptors)
+            adaptor.stop();
 
         // Clear.
         spis.clear();
         spiRsrcs.clear();
+        httpAdaptors.clear();
 
         spiStartTime = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
new file mode 100644
index 0000000..3cf44f2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Client-based discovery SPI test with failure detection timeout enabled.
+ */
+public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscoverySpiSelfTest {
+    /** */
+    private final static int FAILURE_AWAIT_TIME = 7_000;
+
+    /** */
+    private final static long FAILURE_THRESHOLD = 10_000;
+
+    /** */
+    private static long failureThreshold = FAILURE_THRESHOLD;
+
+    /** */
+    private static boolean useTestSpi;
+
+    /** {@inheritDoc} */
+    @Override protected boolean useFailureDetectionTimeout() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long failureDetectionTimeout() {
+        return failureThreshold;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long awaitTime() {
+        return failureDetectionTimeout() + FAILURE_AWAIT_TIME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected TcpDiscoverySpi getDiscoverySpi() {
+        return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi();
+    }
+
+    /**
+     * @throws Exception in case of error.
+     */
+    public void testFailureDetectionTimeoutEnabled() throws Exception {
+        startServerNodes(1);
+        startClientNodes(1);
+
+        checkNodes(1, 1);
+
+        assertTrue(((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).
+                failureDetectionTimeoutEnabled());
+        assertEquals(failureDetectionTimeout(),
+            ((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).failureDetectionTimeout());
+
+        assertTrue(((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).
+                failureDetectionTimeoutEnabled());
+        assertEquals(failureDetectionTimeout(),
+            ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionTimeout());
+    }
+
+    /**
+     * @throws Exception in case of error.
+     */
+    public void testFailureTimeoutWorkabilityAvgTimeout() throws Exception {
+        failureThreshold = 3000;
+
+        try {
+            checkFailureThresholdWorkability();
+        }
+        finally {
+            failureThreshold = FAILURE_THRESHOLD;
+        }
+    }
+
+    /**
+     * @throws Exception in case of error.
+     */
+    public void testFailureTimeoutWorkabilitySmallTimeout() throws Exception {
+        failureThreshold = 500;
+
+        try {
+            checkFailureThresholdWorkability();
+        }
+        finally {
+            failureThreshold = FAILURE_THRESHOLD;
+        }
+    }
+
+    /**
+     * @throws Exception in case of error.
+     */
+    private void checkFailureThresholdWorkability() throws Exception {
+        useTestSpi = true;
+
+        TestTcpDiscoverySpi firstSpi = null;
+        TestTcpDiscoverySpi secondSpi = null;
+
+        try {
+            startServerNodes(2);
+
+            checkNodes(2, 0);
+
+            firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi());
+            secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi());
+
+            assert firstSpi.err == null;
+
+            secondSpi.readDelay = failureDetectionTimeout() + 5000;
+
+            assertFalse(firstSpi.pingNode(secondSpi.getLocalNodeId()));
+
+            Thread.sleep(failureDetectionTimeout());
+
+            assertTrue(firstSpi.err != null && X.hasCause(firstSpi.err, SocketTimeoutException.class));
+
+            firstSpi.reset();
+            secondSpi.reset();
+
+            assertTrue(firstSpi.pingNode(secondSpi.getLocalNodeId()));
+
+            assertTrue(firstSpi.err == null);
+        }
+        finally {
+            useTestSpi = false;
+
+            if (firstSpi != null)
+                firstSpi.reset();
+
+            if (secondSpi != null)
+                secondSpi.reset();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private long readDelay;
+
+        /** */
+        private Exception err;
+
+        /** {@inheritDoc} */
+        @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout)
+            throws IOException, IgniteCheckedException {
+
+            if (readDelay < failureDetectionTimeout()) {
+                try {
+                    return super.readMessage(sock, in, timeout);
+                }
+                catch (Exception e) {
+                    err = e;
+
+                    throw e;
+                }
+            }
+            else {
+                T msg = super.readMessage(sock, in, timeout);
+
+                if (msg instanceof TcpDiscoveryPingRequest) {
+                    try {
+                        Thread.sleep(2000);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                    throw new SocketTimeoutException("Forced timeout");
+                }
+
+                return msg;
+            }
+        }
+
+        /**
+         * Resets testing state.
+         */
+        private void reset() {
+            readDelay = 0;
+            err = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 63db0c1..69a5f13 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -118,7 +118,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+        TcpDiscoverySpi disco = getDiscoverySpi();
 
         disco.setMaxMissedClientHeartbeats(maxMissedClientHbs);
 
@@ -154,9 +154,19 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         else
             throw new IllegalArgumentException();
 
-        if (longSockTimeouts) {
-            disco.setAckTimeout(2000);
-            disco.setSocketTimeout(2000);
+        if (useFailureDetectionTimeout())
+            cfg.setFailureDetectionTimeout(failureDetectionTimeout());
+        else {
+            if (longSockTimeouts) {
+                disco.setAckTimeout(2000);
+                disco.setSocketTimeout(2000);
+            }
+            else {
+                disco.setAckTimeout(gridName.startsWith("client") ? TcpDiscoverySpi.DFLT_ACK_TIMEOUT_CLIENT :
+                    TcpDiscoverySpi.DFLT_ACK_TIMEOUT);
+                disco.setSocketTimeout(gridName.startsWith("client") ? TcpDiscoverySpi.DFLT_SOCK_TIMEOUT_CLIENT :
+                    TcpDiscoverySpi.DFLT_SOCK_TIMEOUT);
+            }
         }
 
         disco.setJoinTimeout(joinTimeout);
@@ -164,7 +174,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         disco.setClientReconnectDisabled(reconnectDisabled);
 
-        disco.afterWrite(afterWrite);
+        if (disco instanceof TestTcpDiscoverySpi)
+            ((TestTcpDiscoverySpi)disco).afterWrite(afterWrite);
 
         cfg.setDiscoverySpi(disco);
 
@@ -174,6 +185,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    /**
+     * Returns TCP Discovery SPI instance to use in a test.
+     * @return TCP Discovery SPI.
+     */
+    protected TcpDiscoverySpi getDiscoverySpi() {
+        return new TestTcpDiscoverySpi();
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
@@ -205,6 +224,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Checks whether to use failure detection timeout instead of setting explicit timeouts.
+     *
+     * @return {@code true} if use.
+     */
+    protected boolean useFailureDetectionTimeout() {
+        return false;
+    }
+
+    /**
+     * Gets failure detection timeout to use.
+     *
+     * @return Failure detection timeout.
+     */
+    protected long failureDetectionTimeout() {
+        return 0;
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testJoinTimeout() throws Exception {
@@ -390,12 +427,12 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         final CountDownLatch latch = new CountDownLatch(1);
 
-        ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+        ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure
+            <Socket>() {
             @Override public void apply(Socket sock) {
                 try {
                     latch.await();
-                }
-                catch (InterruptedException e) {
+                } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
             }
@@ -414,11 +451,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         startServerNodes(2);
         startClientNodes(1);
 
+        checkNodes(2, 1);
+
         Ignite srv0 = G.ignite("server-0");
         Ignite srv1 = G.ignite("server-1");
         Ignite client = G.ignite("client-0");
 
-        ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
+        if (!useFailureDetectionTimeout())
+            ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
 
         ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite();
 
@@ -756,8 +796,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             @Override public void apply(TcpDiscoveryAbstractMessage msg) {
                 try {
                     Thread.sleep(1000000);
-                }
-                catch (InterruptedException ignored) {
+                } catch (InterruptedException ignored) {
                     Thread.interrupted();
                 }
             }
@@ -1405,8 +1444,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         latch.countDown();
 
-        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
-        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS));
+        assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS));
 
         clientNodeIds.add(client.cluster().localNode().id());
 
@@ -1474,7 +1513,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param failSrv If {@code true} fails server, otherwise server does not send join message.
      * @throws Exception If failed.
      */
-    private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
+    protected void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
         netTimeout = 4000;
         joinTimeout = 5000;
 
@@ -1542,9 +1581,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             clientSpi.brakeConnection();
         }
 
-        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS));
 
-        assertTrue(segmentedLatch.await(10_000, MILLISECONDS));
+        assertTrue(segmentedLatch.await(awaitTime(), MILLISECONDS));
 
         waitSegmented(client);
 
@@ -1557,7 +1596,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 @Override public boolean apply() {
                     return srv.cluster().nodes().size() == 1;
                 }
-            }, 10_000);
+            }, awaitTime());
 
             checkNodes(1, 0);
         }
@@ -1614,7 +1653,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         srv.close();
 
-        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS));
 
         srvNodeIds.clear();
         srvIdx.set(0);
@@ -1625,7 +1664,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         startServerNodes(1);
 
-        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS));
 
         clientNodeIds.clear();
         clientNodeIds.add(client.cluster().localNode().id());
@@ -1695,7 +1734,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         clientSpi.brakeConnection();
 
-        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS));
 
         log.info("Fail client connection2.");
 
@@ -1704,7 +1743,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         clientSpi.brakeConnection();
 
-        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS));
 
         clientNodeIds.clear();
 
@@ -1715,7 +1754,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             public boolean apply() {
                 return srv.cluster().nodes().size() == 2;
             }
-        }, 10_000);
+        }, awaitTime());
 
         checkNodes(1, 1);
 
@@ -1759,7 +1798,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param cnt Number of nodes.
      * @throws Exception In case of error.
      */
-    private void startServerNodes(int cnt) throws Exception {
+    protected void startServerNodes(int cnt) throws Exception {
         for (int i = 0; i < cnt; i++) {
             Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
 
@@ -1771,7 +1810,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param cnt Number of nodes.
      * @throws Exception In case of error.
      */
-    private void startClientNodes(int cnt) throws Exception {
+    protected void startClientNodes(int cnt) throws Exception {
         for (int i = 0; i < cnt; i++) {
             Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
 
@@ -1888,7 +1927,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param srvCnt Number of server nodes.
      * @param clientCnt Number of client nodes.
      */
-    private void checkNodes(int srvCnt, int clientCnt) {
+    protected void checkNodes(int srvCnt, int clientCnt) {
         long topVer = -1;
 
         for (int i = 0; i < srvCnt; i++) {
@@ -1950,8 +1989,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param latch Latch.
      * @throws InterruptedException If interrupted.
      */
-    private void await(CountDownLatch latch) throws InterruptedException {
-        assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS));
+    protected void await(CountDownLatch latch) throws InterruptedException {
+        assertTrue("Latch count: " + latch.getCount(), latch.await(awaitTime(), MILLISECONDS));
+    }
+
+    /**
+     * Time to wait for operation completion.
+     *
+     * @return Time in milliseconds.
+     */
+    protected long awaitTime() {
+        return 10_000;
     }
 
     /**
@@ -2072,7 +2120,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
             boolean fail = false;
@@ -2097,17 +2145,18 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 sock.close();
             }
 
-            super.writeToSocket(sock, msg, bout);
+            super.writeToSocket(sock, msg, bout, timeout);
 
             if (afterWrite != null)
                 afterWrite.apply(msg, sock);
         }
 
         /** {@inheritDoc} */
-        @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+        @Override protected Socket openSocket(InetSocketAddress sockAddr,
+            IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
             waitFor(openSockLock);
 
-            return super.openSocket(sockAddr);
+            return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutHelper(this));
         }
 
         /**
@@ -2137,7 +2186,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+            throws IOException {
             if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) {
                 TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
 
@@ -2155,7 +2205,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(msg, sock, res);
+            super.writeToSocket(msg, sock, res, timeout);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
index 3e895be..8ab2116 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.testframework.junits.spi.*;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
new file mode 100644
index 0000000..fbea187
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ *
+ */
+public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelfTest {
+    /** */
+    private static final int SPI_COUNT = 6;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder =  new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected int getSpiCount() {
+        return SPI_COUNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected DiscoverySpi getSpi(int idx) {
+        TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
+
+        spi.setMetricsProvider(createMetricsProvider());
+        spi.setIpFinder(ipFinder);
+
+        switch (idx) {
+            case 0:
+            case 1:
+                // Ignore
+                break;
+            case 2:
+                spi.setAckTimeout(3000);
+                break;
+            case 3:
+                spi.setSocketTimeout(4000);
+                break;
+            case 4:
+                spi.setReconnectCount(4);
+                break;
+            case 5:
+                spi.setMaxAckTimeout(10000);
+                break;
+            default:
+                assert false;
+        }
+
+        return spi;
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionTimeoutEnabled() throws Exception {
+        assertTrue(firstSpi().failureDetectionTimeoutEnabled());
+        assertTrue(secondSpi().failureDetectionTimeoutEnabled());
+
+        assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(),
+                firstSpi().failureDetectionTimeout());
+        assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(),
+                secondSpi().failureDetectionTimeout());
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionTimeoutDisabled() throws Exception {
+        for (int i = 2; i < spis.size(); i++) {
+            assertFalse(((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeoutEnabled());
+            assertEquals(0, ((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeout());
+        }
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionOnSocketOpen() throws Exception {
+        try {
+            ClusterNode node = secondSpi().getLocalNode();
+
+            firstSpi().openSocketTimeout = true;
+
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+            assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout"));
+
+            firstSpi().openSocketTimeout = false;
+            firstSpi().openSocketTimeoutWait = true;
+
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+            assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait"));
+        }
+        finally {
+            firstSpi().resetState();
+        }
+    }
+
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionOnSocketWrite() throws Exception {
+        try {
+            ClusterNode node = secondSpi().getLocalNode();
+
+            firstSpi().writeToSocketTimeoutWait = true;
+
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+
+            firstSpi().writeToSocketTimeoutWait = false;
+
+            assertTrue(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+        }
+        finally {
+            firstSpi().resetState();
+        }
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testConnectionCheckMessage() throws Exception {
+        TestTcpDiscoverySpi nextSpi = null;
+
+        try {
+            assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+            TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+            assertNotNull(nextNode);
+
+            nextSpi = null;
+
+            for (int i = 1; i < spis.size(); i++)
+                if (spis.get(i).getLocalNode().id().equals(nextNode.id())) {
+                    nextSpi = (TestTcpDiscoverySpi)spis.get(i);
+                    break;
+                }
+
+            assertNotNull(nextSpi);
+
+            assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+            firstSpi().countConnCheckMsg = true;
+            nextSpi.countConnCheckMsg = true;
+
+            Thread.sleep(firstSpi().failureDetectionTimeout());
+
+            firstSpi().countConnCheckMsg = false;
+            nextSpi.countConnCheckMsg = false;
+
+            int sent = firstSpi().connCheckStatusMsgCntSent;
+            int received = nextSpi.connCheckStatusMsgCntReceived;
+
+            assert sent >= 3 && sent < 7 : "messages sent: " + sent;
+            assert received >= 3 && received < 7 : "messages received: " + received;
+        }
+        finally {
+            firstSpi().resetState();
+
+            if (nextSpi != null)
+                nextSpi.resetState();
+        }
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testConnectionCheckMessageBackwardCompatibility() throws Exception {
+        TestTcpDiscoverySpi nextSpi = null;
+        TcpDiscoveryNode nextNode = null;
+
+        IgniteProductVersion nextNodeVer = null;
+
+        try {
+            assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+            nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+            assertNotNull(nextNode);
+
+            nextSpi = null;
+
+            for (int i = 1; i < spis.size(); i++)
+                if (spis.get(i).getLocalNode().id().equals(nextNode.id())) {
+                    nextSpi = (TestTcpDiscoverySpi)spis.get(i);
+                    break;
+                }
+
+            assertNotNull(nextSpi);
+
+            assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+            nextNodeVer = nextNode.version();
+
+            // Overriding the version of the next node. Connection check message must not been sent to it.
+            nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+                (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER,
+                0l, null));
+
+            firstSpi().countConnCheckMsg = true;
+            nextSpi.countConnCheckMsg = true;
+
+            Thread.sleep(firstSpi().failureDetectionTimeout() / 2);
+
+            firstSpi().countConnCheckMsg = false;
+            nextSpi.countConnCheckMsg = false;
+
+            int sent = firstSpi().connCheckStatusMsgCntSent;
+            int received = nextSpi.connCheckStatusMsgCntReceived;
+
+            assert sent == 0 : "messages sent: " + sent;
+            assert received == 0 : "messages received: " + received;
+        }
+        finally {
+            firstSpi().resetState();
+
+            if (nextSpi != null)
+                nextSpi.resetState();
+
+            if (nextNode != null && nextNodeVer != null)
+                nextNode.version(nextNodeVer);
+        }
+    }
+
+    /**
+     * Returns the first spi with failure detection timeout enabled.
+     *
+     * @return SPI.
+     */
+    private TestTcpDiscoverySpi firstSpi() {
+        return (TestTcpDiscoverySpi)spis.get(0);
+    }
+
+
+    /**
+     * Returns the second spi with failure detection timeout enabled.
+     *
+     * @return SPI.
+     */
+    private TestTcpDiscoverySpi secondSpi() {
+        return (TestTcpDiscoverySpi)spis.get(1);
+    }
+
+    /**
+     *
+     */
+    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private volatile boolean openSocketTimeout;
+
+        /** */
+        private volatile boolean openSocketTimeoutWait;
+
+        /** */
+        private volatile boolean writeToSocketTimeoutWait;
+
+        /** */
+        private volatile boolean countConnCheckMsg;
+
+        /** */
+        private volatile int connCheckStatusMsgCntSent;
+
+        /** */
+        private volatile int connCheckStatusMsgCntReceived;
+
+        /** */
+        private volatile boolean validTimeout = true;
+
+        /** */
+        private volatile IgniteSpiOperationTimeoutException err;
+
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+            throws IOException, IgniteSpiOperationTimeoutException {
+
+            if (openSocketTimeout) {
+                err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout");
+                throw err;
+            }
+            else if (openSocketTimeoutWait) {
+                long timeout = timeoutHelper.nextTimeoutChunk(0);
+
+                try {
+                    Thread.sleep(timeout + 1000);
+                }
+                catch (InterruptedException e) {
+                    // Ignore
+                }
+
+                try {
+                    timeoutHelper.nextTimeoutChunk(0);
+                }
+                catch (IgniteSpiOperationTimeoutException e) {
+                    throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait"));
+                }
+            }
+
+            Socket sock = super.openSocket(sockAddr, timeoutHelper);
+
+            try {
+                Thread.sleep(1500);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+
+            return sock;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
+            throws IOException, IgniteCheckedException {
+            if (!(msg instanceof TcpDiscoveryPingRequest)) {
+                super.writeToSocket(sock, msg, timeout);
+                return;
+            }
+
+            if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) {
+                validTimeout = false;
+
+                throw new IgniteCheckedException("Invalid timeout: " + timeout);
+            }
+
+            if (writeToSocketTimeoutWait) {
+                try {
+                    Thread.sleep(timeout);
+                }
+                catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+            else
+                super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+            if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+                connCheckStatusMsgCntSent++;
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+
+        /** {@inheritDoc} */
+        protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+            throws IOException {
+            if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+                connCheckStatusMsgCntReceived++;
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
+        /**
+         *
+         */
+        private void resetState() {
+            openSocketTimeout = false;
+            openSocketTimeoutWait = false;
+            writeToSocketTimeoutWait = false;
+            err = null;
+            validTimeout = true;
+            connCheckStatusMsgCntSent = 0;
+            connCheckStatusMsgCntReceived = 0;
+            countConnCheckMsg = false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index ff86bda..3f71d7d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -45,6 +45,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedSelfTest.class));
         suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedShmemTest.class));
 
+        suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.class));
+        suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpFailureDetectionSelfTest.class));
+
         suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
 
         return suite;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index b7014ad..d77c432 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -44,6 +44,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(TcpDiscoverySelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiSelfTest.class));
+        suite.addTest(new TestSuite(TcpDiscoverySpiFailureTimeoutSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiStartStopSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiConfigSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoveryMarshallerCheckSelfTest.class));
@@ -54,6 +55,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
+        suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));
 
         suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoveryNodeConfigConsistentIdSelfTest.class));


[2/2] incubator-ignite git commit: master: back merge from ignite-752

Posted by sb...@apache.org.
master: back merge from ignite-752


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cff25e91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cff25e91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cff25e91

Branch: refs/heads/master
Commit: cff25e91ac16fb11f3790690ec28d39a729519d9
Parents: ae148f1
Author: dmagda <ma...@gmail.com>
Authored: Fri Jul 24 15:32:51 2015 +0300
Committer: dmagda <ma...@gmail.com>
Committed: Fri Jul 24 15:32:51 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  35 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  58 +++
 .../spi/IgniteSpiOperationTimeoutException.java |  43 ++
 .../spi/IgniteSpiOperationTimeoutHelper.java    | 102 ++++
 .../communication/tcp/TcpCommunicationSpi.java  | 122 ++++-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  52 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 509 +++++++++++--------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  11 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 135 +++--
 .../tcp/internal/TcpDiscoveryNode.java          |  21 +
 .../TcpDiscoveryConnectionCheckMessage.java     |  64 +++
 .../IgniteClientReconnectAbstractTest.java      |   4 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   2 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   3 +-
 ...tionSpiRecoveryFailureDetectionSelfTest.java |  54 ++
 ...GridTcpCommunicationSpiRecoverySelfTest.java |  23 +-
 ...unicationSpiTcpFailureDetectionSelfTest.java |  75 +++
 .../discovery/AbstractDiscoverySelfTest.java    |  23 +-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 205 ++++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 116 +++--
 .../tcp/TcpDiscoverySpiConfigSelfTest.java      |   1 +
 .../TcpDiscoverySpiFailureTimeoutSelfTest.java  | 402 +++++++++++++++
 .../IgniteSpiCommunicationSelfTestSuite.java    |   3 +
 .../IgniteSpiDiscoverySelfTestSuite.java        |   2 +
 24 files changed, 1749 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 823ddcd..aac1754 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -190,6 +190,11 @@ public class IgniteConfiguration {
     /** Default value for cache sanity check enabled flag. */
     public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
 
+    /** Default failure detection timeout in millis. */
+    @SuppressWarnings("UnnecessaryBoxing")
+//    public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000);
+    public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000);
+
     /** Optional grid name. */
     private String gridName;
 
@@ -367,6 +372,9 @@ public class IgniteConfiguration {
     /** Port number range for time server. */
     private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE;
 
+    /** Failure detection timeout. */
+    private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT;
+
     /** Property names to include into node attributes. */
     private String[] includeProps;
 
@@ -449,7 +457,7 @@ public class IgniteConfiguration {
         consistentId = cfg.getConsistentId();
         deployMode = cfg.getDeploymentMode();
         discoStartupDelay = cfg.getDiscoveryStartupDelay();
-        pubPoolSize = cfg.getPublicThreadPoolSize();
+        failureDetectionTimeout = cfg.getFailureDetectionTimeout();
         ggHome = cfg.getIgniteHome();
         ggWork = cfg.getWorkDirectory();
         gridName = cfg.getGridName();
@@ -479,6 +487,7 @@ public class IgniteConfiguration {
         p2pMissedCacheSize = cfg.getPeerClassLoadingMissedResourcesCacheSize();
         p2pPoolSize = cfg.getPeerClassLoadingThreadPoolSize();
         pluginCfgs = cfg.getPluginConfigurations();
+        pubPoolSize = cfg.getPublicThreadPoolSize();
         segChkFreq = cfg.getSegmentCheckFrequency();
         segPlc = cfg.getSegmentationPolicy();
         segResolveAttempts = cfg.getSegmentationResolveAttempts();
@@ -1655,6 +1664,30 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Returns failure detection timeout used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
+     * <p>
+     * Default is {@link #DFLT_FAILURE_DETECTION_TIMEOUT}.
+     *
+     * @see #setFailureDetectionTimeout(long)
+     * @return Failure detection timeout in milliseconds.
+     */
+    public Long getFailureDetectionTimeout() {
+        return failureDetectionTimeout;
+    }
+
+    /**
+     * Sets failure detection timeout to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
+     * <p>
+     * Failure detection timeout is used to determine how long the communication or discovery SPIs should wait before
+     * considering a remote connection failed.
+     *
+     * @param failureDetectionTimeout Failure detection timeout in milliseconds.
+     */
+    public void setFailureDetectionTimeout(long failureDetectionTimeout) {
+        this.failureDetectionTimeout = failureDetectionTimeout;
+    }
+
+    /**
      * Should return fully configured load balancing SPI implementation. If not provided,
      * {@link RoundRobinLoadBalancingSpi} will be used.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 2f3def9..f809d82 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
@@ -74,6 +75,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     /** Local node. */
     private ClusterNode locNode;
 
+    /** Failure detection timeout usage switch. */
+    private boolean failureDetectionTimeoutEnabled = true;
+
+    /**
+     *  Failure detection timeout. Initialized with the value of
+     *  {@link IgniteConfiguration#getFailureDetectionTimeout()}.
+     */
+    private long failureDetectionTimeout;
+
     /**
      * Creates new adapter and initializes it from the current (this) class.
      * SPI name will be initialized to the simple name of the class
@@ -583,6 +593,54 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     }
 
     /**
+     * Initiates and checks failure detection timeout value.
+     */
+    protected void initFailureDetectionTimeout() {
+        if (failureDetectionTimeoutEnabled) {
+            failureDetectionTimeout = ignite.configuration().getFailureDetectionTimeout();
+
+            if (failureDetectionTimeout <= 0)
+                throw new IgniteSpiException("Invalid failure detection timeout value: " + failureDetectionTimeout);
+            else if (failureDetectionTimeout <= 10)
+                // Because U.currentTimeInMillis() is updated once in 10 milliseconds.
+                log.warning("Failure detection timeout is too low, it may lead to unpredictable behaviour " +
+                    "[failureDetectionTimeout=" + failureDetectionTimeout + ']');
+        }
+        // Intentionally compare references using '!=' below
+        else if (ignite.configuration().getFailureDetectionTimeout() !=
+                IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT)
+            log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)");
+
+    }
+
+    /**
+     * Enables or disables failure detection timeout.
+     *
+     * @param enabled {@code true} if enable, {@code false} otherwise.
+     */
+    public void failureDetectionTimeoutEnabled(boolean enabled) {
+        failureDetectionTimeoutEnabled = enabled;
+    }
+
+    /**
+     * Checks whether failure detection timeout is enabled for this {@link IgniteSpi}.
+     *
+     * @return {@code true} if enabled, {@code false} otherwise.
+     */
+    public boolean failureDetectionTimeoutEnabled() {
+        return failureDetectionTimeoutEnabled;
+    }
+
+    /**
+     * Returns failure detection timeout set to use for network related operations.
+     *
+     * @return failure detection timeout in milliseconds or {@code 0} if the timeout is disabled.
+     */
+    public long failureDetectionTimeout() {
+        return failureDetectionTimeout;
+    }
+
+    /**
      * Temporarily SPI context.
      */
     private class GridDummySpiContext implements IgniteSpiContext {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
new file mode 100644
index 0000000..0e34cf2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ * Kind of exception that is used when failure detection timeout is enabled for {@link TcpDiscoverySpi} or
+ * {@link TcpCommunicationSpi}.
+ *
+ * For more information refer to {@link IgniteConfiguration#setFailureDetectionTimeout(long)} and
+ * {@link IgniteSpiOperationTimeoutHelper}.
+ */
+public class IgniteSpiOperationTimeoutException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Constructor.
+     * @param msg Error message.
+     */
+    public IgniteSpiOperationTimeoutException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
new file mode 100644
index 0000000..f7d8daa
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.net.*;
+
+/**
+ * Object that incorporates logic that determines a timeout value for the next network related operation and checks
+ * whether a failure detection timeout is reached or not.
+ *
+ * A new instance of the class should be created for every complex network based operations that usually consists of
+ * request and response parts.
+ */
+public class IgniteSpiOperationTimeoutHelper {
+    /** */
+    private long lastOperStartTs;
+
+    /** */
+    private long timeout;
+
+    /** */
+    private final boolean failureDetectionTimeoutEnabled;
+
+    /** */
+    private final long failureDetectionTimeout;
+
+    /**
+     * Constructor.
+     *
+     * @param adapter SPI adapter.
+     */
+    public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) {
+        failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled();
+        failureDetectionTimeout = adapter.failureDetectionTimeout();
+    }
+
+    /**
+     * Returns a timeout value to use for the next network operation.
+     *
+     * If failure detection timeout is enabled then the returned value is a portion of time left since the last time
+     * this method is called. If the timeout is disabled then {@code dfltTimeout} is returned.
+     *
+     * @param dfltTimeout Timeout to use if failure detection timeout is disabled.
+     * @return Timeout in milliseconds.
+     * @throws IgniteSpiOperationTimeoutException If failure detection timeout is reached for an operation that uses
+     * this {@code IgniteSpiOperationTimeoutController}.
+     */
+    public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
+        if (!failureDetectionTimeoutEnabled)
+            return dfltTimeout;
+
+        if (lastOperStartTs == 0) {
+            timeout = failureDetectionTimeout;
+            lastOperStartTs = U.currentTimeMillis();
+        }
+        else {
+            long curTs = U.currentTimeMillis();
+
+            timeout = timeout - (curTs - lastOperStartTs);
+
+            lastOperStartTs = curTs;
+
+            if (timeout <= 0)
+                throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+                    "'failureDetectionTimeout' configuration property [failureDetectionTimeout="
+                    + failureDetectionTimeout + ']');
+        }
+
+        return timeout;
+    }
+
+    /**
+     * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached.
+     *
+     * @param e Exception.
+     * @return {@code true} if failure detection timeout is reached, {@code false} otherwise.
+     */
+    public boolean checkFailureTimeoutReached(Exception e) {
+        if (!failureDetectionTimeoutEnabled)
+            return false;
+
+        return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
+            X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e9fd696..7be1dbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -73,7 +73,21 @@ import static org.apache.ignite.events.EventType.*;
  * {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use
  * {@link #setIdleConnectionTimeout(long)} configuration parameter to configure
  * you own idle connection timeout.
+ * <h1 class="header">Failure Detection</h1>
+ * Configuration defaults (see Configuration section below and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for
+ * communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection
+ * time worse.
  * <p>
+ * If it's needed to tune failure detection then it's highly recommended to do this using
+ * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
+ * following parameters: {@link #getConnectTimeout()}, {@link #getMaxConnectTimeout()},
+ * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
+ * ignored.
+ * <p>
+ * If it's required to perform advanced settings of failure detection and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpCommunicationSpi}
+ * configuration parameters may be used.
  * <h1 class="header">Configuration</h1>
  * <h2 class="header">Mandatory</h2>
  * This SPI has no mandatory configuration parameters.
@@ -991,12 +1005,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * {@code 0} is interpreted as infinite timeout.
      * <p>
      * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param connTimeout Connect timeout.
      */
     @IgniteSpiConfiguration(optional = true)
     public void setConnectTimeout(long connTimeout) {
         this.connTimeout = connTimeout;
+
+        failureDetectionTimeoutEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1013,12 +1031,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * {@code 0} is interpreted as infinite timeout.
      * <p>
      * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param maxConnTimeout Maximum connect timeout.
      */
     @IgniteSpiConfiguration(optional = true)
     public void setMaxConnectTimeout(long maxConnTimeout) {
         this.maxConnTimeout = maxConnTimeout;
+
+        failureDetectionTimeoutEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1031,12 +1053,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * with remote nodes.
      * <p>
      * If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param reconCnt Maximum number of reconnection attempts.
      */
     @IgniteSpiConfiguration(optional = true)
     public void setReconnectCount(int reconCnt) {
         this.reconCnt = reconCnt;
+
+        failureDetectionTimeoutEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1264,6 +1290,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
+        initFailureDetectionTimeout();
+
         assertParameter(locPort > 1023, "locPort > 1023");
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1272,10 +1300,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
         assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
-        assertParameter(reconCnt > 0, "reconnectCnt > 0");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
-        assertParameter(connTimeout >= 0, "connTimeout >= 0");
-        assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+
+        if (!failureDetectionTimeoutEnabled()) {
+            assertParameter(reconCnt > 0, "reconnectCnt > 0");
+            assertParameter(connTimeout >= 0, "connTimeout >= 0");
+            assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+        }
+
         assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
         assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
         assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
@@ -1351,9 +1383,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
             log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
-            log.debug(configInfo("connTimeout", connTimeout));
-            log.debug(configInfo("maxConnTimeout", maxConnTimeout));
-            log.debug(configInfo("reconCnt", reconCnt));
+
+            if (failureDetectionTimeoutEnabled()) {
+                log.debug(configInfo("connTimeout", connTimeout));
+                log.debug(configInfo("maxConnTimeout", maxConnTimeout));
+                log.debug(configInfo("reconCnt", reconCnt));
+            }
+            else
+                log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout()));
+
             log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
             log.debug(configInfo("ackSndThreshold", ackSndThreshold));
             log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
@@ -1906,17 +1944,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         long connTimeout0 = connTimeout;
 
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+
         while (true) {
             GridCommunicationClient client;
 
             try {
                 client = new GridShmemCommunicationClient(metricsLsnr,
                     port,
-                    connTimeout,
+                    timeoutHelper.nextTimeoutChunk(connTimeout),
                     log,
                     getSpiContext().messageFormatter());
             }
             catch (IgniteCheckedException e) {
+                if (timeoutHelper.checkFailureTimeoutReached(e))
+                    throw e;
+
                 // Reconnect for the second time, if connection is not established.
                 if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
                     connectAttempts++;
@@ -1928,15 +1971,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), connTimeout0);
+                safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
             }
-            catch (HandshakeTimeoutException e) {
+            catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+                client.forceClose();
+
+                if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+                    timeoutHelper.checkFailureTimeoutReached(e))) {
+                    log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+                        failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+
+                    throw e;
+                }
+
+                assert !failureDetectionTimeoutEnabled();
+
                 if (log.isDebugEnabled())
-                    log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                    log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                         ", err=" + e.getMessage() + ", client=" + client + ']');
 
-                client.forceClose();
-
                 if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
                     if (log.isDebugEnabled())
                         log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
@@ -2050,6 +2103,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             int attempt = 1;
 
+            IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+
             while (!conn) { // Reconnection on handshake timeout.
                 try {
                     SocketChannel ch = SocketChannel.open();
@@ -2076,9 +2131,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     long rcvCnt = -1;
 
                     try {
-                        ch.socket().connect(addr, (int)connTimeout);
+                        ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(),
+                            timeoutHelper.nextTimeoutChunk(connTimeout0));
 
                         if (rcvCnt == -1)
                             return null;
@@ -2112,19 +2168,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                     }
                 }
-                catch (HandshakeTimeoutException e) {
+                catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                     if (client != null) {
                         client.forceClose();
 
                         client = null;
                     }
 
+                    if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+                        timeoutHelper.checkFailureTimeoutReached(e))) {
+
+                        String msg = "Handshake timed out (failure detection timeout is reached) " +
+                            "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']';
+
+                        onException(msg, e);
+
+                        if (log.isDebugEnabled())
+                            log.debug(msg);
+
+                        if (errs == null)
+                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                                "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
+                                "in order to prevent parties from waiting forever in case of network issues " +
+                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+
+                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+
+                        break;
+                    }
+
+                    assert !failureDetectionTimeoutEnabled();
+
                     onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                         ", addr=" + addr + ']', e);
 
                     if (log.isDebugEnabled())
                         log.debug(
-                            "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                            "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                                 ", addr=" + addr + ", err=" + e + ']');
 
                     if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
@@ -2164,7 +2244,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
 
-                    if (X.hasCause(e, SocketTimeoutException.class))
+                    boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
+
+                    if (failureDetThrReached)
+                        LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
+                            "configuration property) [addr=" + addr + ", failureDetectionTimeout=" +
+                            failureDetectionTimeout() + ']');
+                    else if (X.hasCause(e, SocketTimeoutException.class))
                         LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
                             "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
 
@@ -2177,7 +2263,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
 
                     // Reconnect for the second time, if connection is not established.
-                    if (connectAttempts < 2 &&
+                    if (!failureDetThrReached && connectAttempts < 2 &&
                         (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
                         connectAttempts++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 572ba2c..12b10b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -480,13 +480,17 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         Collection<Throwable> errs = null;
 
-        long ackTimeout0 = spi.ackTimeout;
+        long ackTimeout0 = spi.getAckTimeout();
+
+        int reconCnt = 0;
 
         int connectAttempts = 1;
 
         UUID locNodeId = getLocalNodeId();
 
-        for (int i = 0; i < spi.reconCnt; i++) {
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+        while (true) {
             boolean openSock = false;
 
             Socket sock = null;
@@ -494,7 +498,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr);
+                sock = spi.openSocket(addr, timeoutHelper);
 
                 openSock = true;
 
@@ -502,7 +506,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 req.client(true);
 
-                spi.writeToSocket(sock, req);
+                spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
 
@@ -532,7 +536,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 msg.client(true);
 
-                spi.writeToSocket(sock, msg);
+                spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -540,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
                         ", rmtNodeId=" + rmtNodeId + ']');
 
-                return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck());
+                return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
+                    res.clientAck());
             }
             catch (IOException | IgniteCheckedException e) {
                 U.closeQuiet(sock);
@@ -555,6 +560,12 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (timeoutHelper.checkFailureTimeoutReached(e))
+                    break;
+
+                if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+                    break;
+
                 if (!openSock) {
                     // Reconnect for the second time, if connection is not established.
                     if (connectAttempts < 2) {
@@ -566,7 +577,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     break; // Don't retry if we can not establish connection.
                 }
 
-                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException ||
+                    X.hasCause(e, SocketTimeoutException.class))) {
                     ackTimeout0 *= 2;
 
                     if (!checkAckTimeout(ackTimeout0))
@@ -868,6 +880,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
         /** */
+        private final long socketTimeout;
+
+        /** */
         private TcpDiscoveryAbstractMessage unackedMsg;
 
         /**
@@ -875,6 +890,9 @@ class ClientImpl extends TcpDiscoveryImpl {
          */
         protected SocketWriter() {
             super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
+
+            socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+                spi.getSocketTimeout();
         }
 
         /**
@@ -968,12 +986,13 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    spi.writeToSocket(sock, msg);
+                    spi.writeToSocket(sock, msg, socketTimeout);
 
                     msg = null;
 
                     if (ack) {
-                        long waitEnd = U.currentTimeMillis() + spi.ackTimeout;
+                        long waitEnd = U.currentTimeMillis() + (spi.failureDetectionTimeoutEnabled() ?
+                            spi.failureDetectionTimeout() : spi.getAckTimeout());
 
                         TcpDiscoveryAbstractMessage unacked;
 
@@ -989,7 +1008,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (unacked != null) {
                             if (log.isDebugEnabled())
                                 log.debug("Failed to get acknowledge for message, will try to reconnect " +
-                                "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']');
+                                "[msg=" + unacked +
+                                (spi.failureDetectionTimeoutEnabled() ?
+                                ", failureDetectionTimeout=" + spi.failureDetectionTimeout() :
+                                ", timeout=" + spi.getAckTimeout()) + ']');
 
                             throw new IOException("Failed to get acknowledge for message: " + unacked);
                         }
@@ -1068,11 +1090,11 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (join) {
                             joinError(new IgniteSpiException("Join process timed out, connection failed and " +
                                 "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
-                                "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
                         }
                         else
-                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
-                                "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" +
+                                " configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
 
                         return;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index dc343eb..b4f89ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -80,14 +80,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Client message workers. */
     protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
 
-    /** Metrics sender. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private HeartbeatsSender hbsSnd;
-
-    /** Status checker. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private CheckStatusSender chkStatusSnd;
-
     /** IP finder cleaner. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IpFinderCleaner ipFinderCleaner;
@@ -229,12 +221,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         spi.stats.onJoinFinished();
 
-        hbsSnd = new HeartbeatsSender();
-        hbsSnd.start();
-
-        chkStatusSnd = new CheckStatusSender();
-        chkStatusSnd.start();
-
         if (spi.ipFinder.isShared()) {
             ipFinderCleaner = new IpFinderCleaner();
             ipFinderCleaner.start();
@@ -278,10 +264,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
 
             synchronized (mux) {
-                long threshold = U.currentTimeMillis() + spi.netTimeout;
-
                 long timeout = spi.netTimeout;
 
+                long threshold = U.currentTimeMillis() + timeout;
+
                 while (spiState != LEFT && timeout > 0) {
                     try {
                         mux.wait(timeout);
@@ -319,12 +305,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(tmp);
         U.joinThreads(tmp, log);
 
-        U.interrupt(hbsSnd);
-        U.join(hbsSnd, log);
-
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
-
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
 
@@ -482,6 +462,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         UUID locNodeId = getLocalNodeId();
 
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
         if (F.contains(spi.locNodeAddrs, addr)) {
             if (clientNodeId == null)
                 return F.t(getLocalNodeId(), false);
@@ -494,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             boolean clientPingRes;
 
             try {
-                clientPingRes = clientWorker.ping();
+                clientPingRes = clientWorker.ping(timeoutHelper);
             }
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -517,18 +499,26 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 Socket sock = null;
 
-                for (int i = 0; i < spi.reconCnt; i++) {
+                int reconCnt = 0;
+
+                boolean openedSock = false;
+
+                while (true) {
                     try {
                         if (addr.isUnresolved())
                             addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
 
                         long tstamp = U.currentTimeMillis();
 
-                        sock = spi.openSocket(addr);
+                        sock = spi.openSocket(addr, timeoutHelper);
 
-                        spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+                        openedSock = true;
 
-                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout);
+                        spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+                            timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+
+                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
+                            spi.getAckTimeout()));
 
                         if (locNodeId.equals(res.creatorNodeId())) {
                             if (log.isDebugEnabled())
@@ -550,6 +540,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                             errs = new ArrayList<>();
 
                         errs.add(e);
+
+                        reconCnt++;
+
+                        if (!openedSock && reconCnt == 2)
+                            break;
+
+                        if (timeoutHelper.checkFailureTimeoutReached(e))
+                            break;
+                        else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount())
+                            break;
                     }
                     finally {
                         U.closeQuiet(sock);
@@ -607,6 +607,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected void onDataReceived() {
+        if (spi.failureDetectionTimeoutEnabled() && locNode != null)
+            locNode.lastDataReceivedTime(U.currentTimeMillis());
+    }
+
     /**
      * Tries to join this node to topology.
      *
@@ -678,10 +684,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Join request message has been sent (waiting for coordinator response).");
 
             synchronized (mux) {
-                long threshold = U.currentTimeMillis() + spi.netTimeout;
-
                 long timeout = spi.netTimeout;
 
+                long threshold = U.currentTimeMillis() + timeout;
+
                 while (spiState == CONNECTING && timeout > 0) {
                     try {
                         mux.wait(timeout);
@@ -883,15 +889,19 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         Collection<Throwable> errs = null;
 
-        long ackTimeout0 = spi.ackTimeout;
+        long ackTimeout0 = spi.getAckTimeout();
 
         int connectAttempts = 1;
 
-        boolean joinReqSent = false;
+        boolean joinReqSent;
 
         UUID locNodeId = getLocalNodeId();
 
-        for (int i = 0; i < spi.reconCnt; i++) {
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+        int reconCnt = 0;
+
+        while (true){
             // Need to set to false on each new iteration,
             // since remote node may leave in the middle of the first iteration.
             joinReqSent = false;
@@ -903,14 +913,16 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr);
+                sock = spi.openSocket(addr, timeoutHelper);
 
                 openSock = true;
 
                 // Handshake.
-                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
+                    spi.getSocketTimeout()));
 
-                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
+                    ackTimeout0));
 
                 if (locNodeId.equals(res.creatorNodeId())) {
                     if (log.isDebugEnabled())
@@ -924,7 +936,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Send message.
                 tstamp = U.currentTimeMillis();
 
-                spi.writeToSocket(sock, msg);
+                spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -941,7 +953,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // E.g. due to class not found issue.
                 joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
 
-                return spi.readReceipt(sock, ackTimeout0);
+                return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
             }
             catch (ClassCastException e) {
                 // This issue is rarely reproducible on AmazonEC2, but never
@@ -967,6 +979,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (timeoutHelper.checkFailureTimeoutReached(e))
+                    break;
+
+                if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+                    break;
+
                 if (!openSock) {
                     // Reconnect for the second time, if connection is not established.
                     if (connectAttempts < 2) {
@@ -978,7 +996,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     break; // Don't retry if we can not establish connection.
                 }
 
-                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException ||
+                    X.hasCause(e, SocketTimeoutException.class))) {
                     ackTimeout0 *= 2;
 
                     if (!checkAckTimeout(ackTimeout0))
@@ -1256,12 +1275,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(tcpSrvr);
         U.join(tcpSrvr, log);
 
-        U.interrupt(hbsSnd);
-        U.join(hbsSnd, log);
-
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
-
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
 
@@ -1350,8 +1363,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             b.append("Internal threads: ").append(U.nl());
 
             b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
-            b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
-            b.append("    HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
+
             b.append("    IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
             b.append("    Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
 
@@ -1398,7 +1410,8 @@ class ServerImpl extends TcpDiscoveryImpl {
     private boolean recordable(TcpDiscoveryAbstractMessage msg) {
         return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
             !(msg instanceof TcpDiscoveryStatusCheckMessage) &&
-            !(msg instanceof TcpDiscoveryDiscardMessage);
+            !(msg instanceof TcpDiscoveryDiscardMessage) &&
+            !(msg instanceof TcpDiscoveryConnectionCheckMessage);
     }
 
     /**
@@ -1434,112 +1447,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     * Thread that sends heartbeats.
-     */
-    private class HeartbeatsSender extends IgniteSpiThread {
-        /**
-         * Constructor.
-         */
-        private HeartbeatsSender() {
-            super(spi.ignite().name(), "tcp-disco-hb-sender", log);
-
-            setPriority(spi.threadPri);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("BusyWait")
-        @Override protected void body() throws InterruptedException {
-            while (!isLocalNodeCoordinator())
-                Thread.sleep(1000);
-
-            if (log.isDebugEnabled())
-                log.debug("Heartbeats sender has been started.");
-
-            UUID nodeId = getConfiguredNodeId();
-
-            while (!isInterrupted()) {
-                if (spiStateCopy() != CONNECTED) {
-                    if (log.isDebugEnabled())
-                        log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
-
-                    return;
-                }
-
-                TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId);
-
-                msg.verify(getLocalNodeId());
-
-                msgWorker.addMessage(msg);
-
-                Thread.sleep(spi.hbFreq);
-            }
-        }
-    }
-
-    /**
-     * Thread that sends status check messages to next node if local node has not
-     * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
-     * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
-     * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
-     */
-    private class CheckStatusSender extends IgniteSpiThread {
-        /**
-         * Constructor.
-         */
-        private CheckStatusSender() {
-            super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
-
-            setPriority(spi.threadPri);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("BusyWait")
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Status check sender has been started.");
-
-            // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
-            long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
-
-            long lastSent = 0;
-
-            while (!isInterrupted()) {
-                // 1. Determine timeout.
-                if (lastSent < locNode.lastUpdateTime())
-                    lastSent = locNode.lastUpdateTime();
-
-                long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
-
-                if (timeout > 0)
-                    Thread.sleep(timeout);
-
-                // 2. Check if SPI is still connected.
-                if (spiStateCopy() != CONNECTED) {
-                    if (log.isDebugEnabled())
-                        log.debug("Stopping status check sender (SPI is not connected to topology).");
-
-                    return;
-                }
-
-                // 3. Was there an update?
-                if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping status check send " +
-                            "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
-                            ", hasRmts=" + ring.hasRemoteNodes() + ']');
-
-                    continue;
-                }
-
-                // 4. Send status check message.
-                lastSent = U.currentTimeMillis();
-
-                msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
-            }
-        }
-    }
-
-    /**
      * Thread that cleans IP finder and keeps it in the correct state, unregistering
      * addresses of the nodes that has left the topology.
      * <p>
@@ -1861,10 +1768,49 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private Socket sock;
 
+        /** Last time status message has been sent. */
+        private long lastTimeStatusMsgSent;
+
+        /** Incoming heartbeats check frequency. */
+        private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+
+        /** Last time heartbeat message has been sent. */
+        private long lastTimeHbMsgSent;
+
+        /** Time when the last status message has been sent. */
+        private long lastTimeConnCheckMsgSent;
+
+        /** Flag that keeps info on whether the threshold is reached or not. */
+        private boolean failureThresholdReached;
+
+        /** Connection check frequency. */
+        private long connCheckFreq;
+
         /**
          */
         protected RingMessageWorker() {
-            super("tcp-disco-msg-worker");
+            super("tcp-disco-msg-worker", 10);
+
+            initConnectionCheckFrequency();
+        }
+
+        /**
+         * Initializes connection check frequency. Used only when failure detection timeout is enabled.
+         */
+        private void initConnectionCheckFrequency() {
+            if (spi.failureDetectionTimeoutEnabled()) {
+                for (int i = 3; i > 0; i--) {
+                    connCheckFreq = spi.failureDetectionTimeout() / i;
+
+                    if (connCheckFreq > 0)
+                        break;
+                }
+
+                assert connCheckFreq > 0;
+
+                if (log.isDebugEnabled())
+                    log.debug("Connection check frequency is calculated: " + connCheckFreq);
+            }
         }
 
         /**
@@ -1921,9 +1867,25 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (spi.ensured(msg))
                 msgHist.add(msg);
 
+            if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId()))
+                // Reset the flag.
+                failureThresholdReached = false;
+
             spi.stats.onMessageProcessingFinished(msg);
         }
 
+        /** {@inheritDoc} */
+        @Override protected void noMessageLoop() {
+            if (locNode == null)
+                return;
+
+            checkConnection();
+
+            sendHeartbeatMessage();
+
+            checkHeartbeatsReceiving();
+        }
+
         /**
          * Sends message across the ring.
          *
@@ -1990,7 +1952,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (debugMode)
                             debugLog("No next node in topology.");
 
-                        if (ring.hasRemoteNodes()) {
+                        if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
+                            !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
                             msg.senderNodeId(locNodeId);
 
                             addMessage(msg);
@@ -2027,7 +1990,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
 
                 addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) {
-                    long ackTimeout0 = spi.ackTimeout;
+                    long ackTimeout0 = spi.getAckTimeout();
 
                     if (locNodeAddrs.contains(addr)){
                         if (log.isDebugEnabled())
@@ -2037,8 +2000,15 @@ class ServerImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    for (int i = 0; i < spi.reconCnt; i++) {
+                    int reconCnt = 0;
+
+                    IgniteSpiOperationTimeoutHelper timeoutHelper = null;
+
+                    while (true) {
                         if (sock == null) {
+                            if (timeoutHelper == null)
+                                timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
                             nextNodeExists = false;
 
                             boolean success = false;
@@ -2049,14 +2019,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                sock = spi.openSocket(addr);
+                                sock = spi.openSocket(addr, timeoutHelper);
 
                                 openSock = true;
 
                                 // Handshake.
-                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
+                                    timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+                                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
+                                    timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                 if (locNodeId.equals(res.creatorNodeId())) {
                                     if (log.isDebugEnabled())
@@ -2140,8 +2112,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (!openSock)
                                     break; // Don't retry if we can not establish connection.
 
-                                if (e instanceof SocketTimeoutException ||
-                                    X.hasCause(e, SocketTimeoutException.class)) {
+                                if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+                                    break;
+
+                                if (timeoutHelper.checkFailureTimeoutReached(e))
+                                    break;
+                                else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
+                                    SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
                                     ackTimeout0 *= 2;
 
                                     if (!checkAckTimeout(ackTimeout0))
@@ -2156,9 +2133,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     sock = null;
                                 }
-                                else
+                                else {
                                     // Next node exists and accepts incoming messages.
                                     nextNodeExists = true;
+                                    // Resetting timeout control object to let the code below to use a new one
+                                    // for the next bunch of operations.
+                                    timeoutHelper = null;
+                                }
                             }
                         }
 
@@ -2195,8 +2176,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
                                         pendingMsgs.discardId);
 
+                                    if (timeoutHelper == null)
+                                        timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
                                     try {
-                                        writeToSocket(sock, pendingMsg);
+                                        writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk(
+                                            spi.getSocketTimeout()));
                                     }
                                     finally {
                                         clearNodeAddedMessage(pendingMsg);
@@ -2204,7 +2189,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
 
-                                    int res = spi.readReceipt(sock, ackTimeout0);
+                                    int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                     if (log.isDebugEnabled())
                                         log.debug("Pending message has been sent to next node [msg=" + msg.id() +
@@ -2215,19 +2200,34 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         debugLog("Pending message has been sent to next node [msg=" + msg.id() +
                                             ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
                                             ", res=" + res + ']');
+
+                                    // Resetting timeout control object to create a new one for the next bunch of
+                                    // operations.
+                                    timeoutHelper = null;
                                 }
                             }
 
-                            prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+                            if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+                                if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+                                    TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
+                                    TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER))
+                                    // Preserve backward compatibility with nodes of older versions.
+                                    msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
+                            }
+                            else
+                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                writeToSocket(sock, msg);
+                                if (timeoutHelper == null)
+                                    timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+                                writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
-                                int res = spi.readReceipt(sock, ackTimeout0);
+                                int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                 if (log.isDebugEnabled())
                                     log.debug("Message has been sent to next node [msg=" + msg +
@@ -2262,11 +2262,19 @@ class ServerImpl extends TcpDiscoveryImpl {
                             onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
                                 e);
 
-                            if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
-                                ackTimeout0 *= 2;
+                            if (timeoutHelper.checkFailureTimeoutReached(e))
+                                break;
 
-                                if (!checkAckTimeout(ackTimeout0))
+                            if (!spi.failureDetectionTimeoutEnabled()) {
+                                if (++reconCnt == spi.getReconnectCount())
                                     break;
+                                else if (e instanceof SocketTimeoutException ||
+                                    X.hasCause(e, SocketTimeoutException.class)) {
+                                    ackTimeout0 *= 2;
+
+                                    if (!checkAckTimeout(ackTimeout0))
+                                        break;
+                                }
                             }
                         }
                         finally {
@@ -2279,7 +2287,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 if (log.isDebugEnabled())
                                     log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
-                                        ", i=" + i + ']');
+                                        (!spi.failureDetectionTimeoutEnabled() ? ", i=" + reconCnt : "") + ']');
                             }
                         }
                     } // Try to reconnect.
@@ -3342,7 +3350,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
                 else if (leftNode.equals(next) && sock != null) {
                     try {
-                        writeToSocket(sock, msg);
+                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                            spi.failureDetectionTimeout() : spi.getSocketTimeout());
 
                         if (log.isDebugEnabled())
                             log.debug("Sent verified node left message to leaving node: " + msg);
@@ -3991,6 +4000,77 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
         }
+
+        /**
+         * Sends heartbeat message if needed.
+         */
+        private void sendHeartbeatMessage() {
+            if (!isLocalNodeCoordinator())
+                return;
+
+            long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
+
+            msg.verify(getLocalNodeId());
+
+            msgWorker.addMessage(msg);
+
+            lastTimeHbMsgSent = U.currentTimeMillis();
+        }
+
+        /**
+         * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
+         * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring.
+         */
+        private void checkHeartbeatsReceiving() {
+            if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
+                lastTimeStatusMsgSent = locNode.lastUpdateTime();
+
+            long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+
+            lastTimeStatusMsgSent = U.currentTimeMillis();
+        }
+
+        /**
+         * Check connection aliveness status.
+         */
+        private void checkConnection() {
+            if (!spi.failureDetectionTimeoutEnabled())
+                return;
+
+            if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime()
+                >= spi.failureDetectionTimeout() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+
+                log.info("Local node seems to be disconnected from topology (failure detection timeout " +
+                    "is reached): [failureDetectionTimeout=" + spi.failureDetectionTimeout() +
+                    ", connCheckFreq=" + connCheckFreq + ']');
+
+                failureThresholdReached = true;
+
+                // Reset sent time deliberately to force sending connection check message.
+                lastTimeConnCheckMsgSent = 0;
+            }
+
+            long elapsed = (lastTimeConnCheckMsgSent + connCheckFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            if (ring.hasRemoteNodes()) {
+                sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode));
+
+                lastTimeConnCheckMsgSent = U.currentTimeMillis();
+            }
+        }
     }
 
     /**
@@ -4186,14 +4266,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
 
+                            IgniteSpiOperationTimeoutHelper timeoutHelper =
+                                new IgniteSpiOperationTimeoutHelper(spi);
+
                             if (req.clientNodeId() != null) {
                                 ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
 
                                 if (clientWorker != null)
-                                    res.clientExists(clientWorker.ping());
+                                    res.clientExists(clientWorker.ping(timeoutHelper));
                             }
 
-                            spi.writeToSocket(sock, res);
+                            spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
                         }
                         else if (log.isDebugEnabled())
                             log.debug("Ignore ping request, node is stopping.");
@@ -4214,7 +4297,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (req.client())
                         res.clientAck(true);
 
-                    spi.writeToSocket(sock, res);
+                    spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ?
+                        spi.failureDetectionTimeout() : spi.getSocketTimeout());
 
                     // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
                     // the local node sends a handshake request message on the loopback address, so we get here.
@@ -4323,6 +4407,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+                    spi.getSocketTimeout();
+
                 while (!isInterrupted()) {
                     try {
                         TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
@@ -4337,7 +4424,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (debugMode && recordable(msg))
                             debugLog("Message has been received: " + msg);
 
-                        if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                        if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+
+                            continue;
+                        }
+                        else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                             TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
 
                             if (!req.responded()) {
@@ -4355,7 +4447,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 TcpDiscoverySpiState state = spiStateCopy();
 
                                 if (state == CONNECTED) {
-                                    spi.writeToSocket(msg, sock, RES_OK);
+                                    spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                                     if (clientMsgWrk.getState() == State.NEW)
                                         clientMsgWrk.start();
@@ -4365,7 +4457,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     continue;
                                 }
                                 else {
-                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN);
+                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout);
 
                                     break;
                                 }
@@ -4373,7 +4465,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4402,7 +4494,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4431,7 +4523,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4460,7 +4552,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4509,7 +4601,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             clientMsgWrk.addMessage(ack);
                         }
                         else
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
                     }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
@@ -4610,8 +4702,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             TcpDiscoverySpiState state = spiStateCopy();
 
+            long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+                spi.getSocketTimeout();
+
             if (state == CONNECTED) {
-                spi.writeToSocket(msg, sock, RES_OK);
+                spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -4648,7 +4743,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Local node is stopping. Remote node should try next one.
                     res = RES_CONTINUE_JOIN;
 
-                spi.writeToSocket(msg, sock, res);
+                spi.writeToSocket(msg, sock, res, socketTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
@@ -4741,7 +4836,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param clientNodeId Node ID.
          */
         protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
-            super("tcp-disco-client-message-worker");
+            super("tcp-disco-client-message-worker", 2000);
 
             this.sock = sock;
             this.clientNodeId = clientNodeId;
@@ -4791,7 +4886,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 
-                        writeToSocket(sock, msg);
+                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                            spi.failureDetectionTimeout() : spi.getSocketTimeout());
                     }
                 }
                 else {
@@ -4802,7 +4898,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         prepareNodeAddedMessage(msg, clientNodeId, null, null);
 
-                        writeToSocket(sock, msg);
+                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                            spi.failureDetectionTimeout() : spi.getSocketTimeout());
                     }
                     finally {
                         clearNodeAddedMessage(msg);
@@ -4836,10 +4933,11 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param timeoutHelper Timeout controller.
          * @return Ping result.
          * @throws InterruptedException If interrupted.
          */
-        public boolean ping() throws InterruptedException {
+        public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
             if (spi.isNodeStopping0())
                 return false;
 
@@ -4865,7 +4963,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             try {
-                return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS);
+                return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()),
+                    TimeUnit.MILLISECONDS);
             }
             catch (IgniteInterruptedCheckedException ignored) {
                 throw new InterruptedException();
@@ -4904,12 +5003,18 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Backed interrupted flag. */
         private volatile boolean interrupted;
 
+        /** Polling timeout. */
+        private final long pollingTimeout;
+
         /**
          * @param name Thread name.
+         * @param pollingTimeout Messages polling timeout.
          */
-        protected MessageWorkerAdapter(String name) {
+        protected MessageWorkerAdapter(String name, long pollingTimeout) {
             super(spi.ignite().name(), name, log);
 
+            this.pollingTimeout = pollingTimeout;
+
             setPriority(spi.threadPri);
         }
 
@@ -4919,12 +5024,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
 
             while (!isInterrupted()) {
-                TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
+                TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
 
                 if (msg == null)
-                    continue;
-
-                processMessage(msg);
+                    noMessageLoop();
+                else
+                    processMessage(msg);
             }
         }
 
@@ -4968,16 +5073,24 @@ class ServerImpl extends TcpDiscoveryImpl {
         protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
 
         /**
+         * Called when there is no message to process giving ability to perform other activity.
+         */
+        protected void noMessageLoop() {
+            // No-op.
+        }
+
+        /**
          * @param sock Socket.
          * @param msg Message.
+         * @param timeout Socket timeout.
          * @throws IOException If IO failed.
          * @throws IgniteCheckedException If marshalling failed.
          */
-        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
             throws IOException, IgniteCheckedException {
             bout.reset();
 
-            spi.writeToSocket(sock, msg, bout);
+            spi.writeToSocket(sock, msg, bout, timeout);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index c271b7c..4dacf45 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -131,6 +131,13 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
+     * Called when a chunk of data is received from a remote node.
+     */
+    protected void onDataReceived() {
+        // No-op
+    }
+
+    /**
      * @param log Logger.
      */
     public abstract void dumpDebugInfo(IgniteLogger log);
@@ -273,10 +280,10 @@ abstract class TcpDiscoveryImpl {
      * maximum acknowledgement timeout, {@code false} otherwise.
      */
     protected boolean checkAckTimeout(long ackTimeout) {
-        if (ackTimeout > spi.maxAckTimeout) {
+        if (ackTimeout > spi.getMaxAckTimeout()) {
             LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
                 "(consider increasing 'maxAckTimeout' configuration property) " +
-                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
+                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']');
 
             return false;
         }