You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/27 11:45:27 UTC

[39/50] [abbrv] incubator-ignite git commit: master: back merge from ignite-752

master: back merge from ignite-752


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cff25e91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cff25e91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cff25e91

Branch: refs/heads/ignite-961
Commit: cff25e91ac16fb11f3790690ec28d39a729519d9
Parents: ae148f1
Author: dmagda <ma...@gmail.com>
Authored: Fri Jul 24 15:32:51 2015 +0300
Committer: dmagda <ma...@gmail.com>
Committed: Fri Jul 24 15:32:51 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  35 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  58 +++
 .../spi/IgniteSpiOperationTimeoutException.java |  43 ++
 .../spi/IgniteSpiOperationTimeoutHelper.java    | 102 ++++
 .../communication/tcp/TcpCommunicationSpi.java  | 122 ++++-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  52 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 509 +++++++++++--------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  11 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 135 +++--
 .../tcp/internal/TcpDiscoveryNode.java          |  21 +
 .../TcpDiscoveryConnectionCheckMessage.java     |  64 +++
 .../IgniteClientReconnectAbstractTest.java      |   4 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   2 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   3 +-
 ...tionSpiRecoveryFailureDetectionSelfTest.java |  54 ++
 ...GridTcpCommunicationSpiRecoverySelfTest.java |  23 +-
 ...unicationSpiTcpFailureDetectionSelfTest.java |  75 +++
 .../discovery/AbstractDiscoverySelfTest.java    |  23 +-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 205 ++++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 116 +++--
 .../tcp/TcpDiscoverySpiConfigSelfTest.java      |   1 +
 .../TcpDiscoverySpiFailureTimeoutSelfTest.java  | 402 +++++++++++++++
 .../IgniteSpiCommunicationSelfTestSuite.java    |   3 +
 .../IgniteSpiDiscoverySelfTestSuite.java        |   2 +
 24 files changed, 1749 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 823ddcd..aac1754 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -190,6 +190,11 @@ public class IgniteConfiguration {
     /** Default value for cache sanity check enabled flag. */
     public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
 
+    /** Default failure detection timeout in millis. */
+    @SuppressWarnings("UnnecessaryBoxing")
+//    public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000);
+    public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000);
+
     /** Optional grid name. */
     private String gridName;
 
@@ -367,6 +372,9 @@ public class IgniteConfiguration {
     /** Port number range for time server. */
     private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE;
 
+    /** Failure detection timeout. */
+    private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT;
+
     /** Property names to include into node attributes. */
     private String[] includeProps;
 
@@ -449,7 +457,7 @@ public class IgniteConfiguration {
         consistentId = cfg.getConsistentId();
         deployMode = cfg.getDeploymentMode();
         discoStartupDelay = cfg.getDiscoveryStartupDelay();
-        pubPoolSize = cfg.getPublicThreadPoolSize();
+        failureDetectionTimeout = cfg.getFailureDetectionTimeout();
         ggHome = cfg.getIgniteHome();
         ggWork = cfg.getWorkDirectory();
         gridName = cfg.getGridName();
@@ -479,6 +487,7 @@ public class IgniteConfiguration {
         p2pMissedCacheSize = cfg.getPeerClassLoadingMissedResourcesCacheSize();
         p2pPoolSize = cfg.getPeerClassLoadingThreadPoolSize();
         pluginCfgs = cfg.getPluginConfigurations();
+        pubPoolSize = cfg.getPublicThreadPoolSize();
         segChkFreq = cfg.getSegmentCheckFrequency();
         segPlc = cfg.getSegmentationPolicy();
         segResolveAttempts = cfg.getSegmentationResolveAttempts();
@@ -1655,6 +1664,30 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Returns failure detection timeout used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
+     * <p>
+     * Default is {@link #DFLT_FAILURE_DETECTION_TIMEOUT}.
+     *
+     * @see #setFailureDetectionTimeout(long)
+     * @return Failure detection timeout in milliseconds.
+     */
+    public Long getFailureDetectionTimeout() {
+        return failureDetectionTimeout;
+    }
+
+    /**
+     * Sets failure detection timeout to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
+     * <p>
+     * Failure detection timeout is used to determine how long the communication or discovery SPIs should wait before
+     * considering a remote connection failed.
+     *
+     * @param failureDetectionTimeout Failure detection timeout in milliseconds.
+     */
+    public void setFailureDetectionTimeout(long failureDetectionTimeout) {
+        this.failureDetectionTimeout = failureDetectionTimeout;
+    }
+
+    /**
      * Should return fully configured load balancing SPI implementation. If not provided,
      * {@link RoundRobinLoadBalancingSpi} will be used.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 2f3def9..f809d82 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
@@ -74,6 +75,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     /** Local node. */
     private ClusterNode locNode;
 
+    /** Failure detection timeout usage switch. */
+    private boolean failureDetectionTimeoutEnabled = true;
+
+    /**
+     *  Failure detection timeout. Initialized with the value of
+     *  {@link IgniteConfiguration#getFailureDetectionTimeout()}.
+     */
+    private long failureDetectionTimeout;
+
     /**
      * Creates new adapter and initializes it from the current (this) class.
      * SPI name will be initialized to the simple name of the class
@@ -583,6 +593,54 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     }
 
     /**
+     * Initiates and checks failure detection timeout value.
+     */
+    protected void initFailureDetectionTimeout() {
+        if (failureDetectionTimeoutEnabled) {
+            failureDetectionTimeout = ignite.configuration().getFailureDetectionTimeout();
+
+            if (failureDetectionTimeout <= 0)
+                throw new IgniteSpiException("Invalid failure detection timeout value: " + failureDetectionTimeout);
+            else if (failureDetectionTimeout <= 10)
+                // Because U.currentTimeInMillis() is updated once in 10 milliseconds.
+                log.warning("Failure detection timeout is too low, it may lead to unpredictable behaviour " +
+                    "[failureDetectionTimeout=" + failureDetectionTimeout + ']');
+        }
+        // Intentionally compare references using '!=' below
+        else if (ignite.configuration().getFailureDetectionTimeout() !=
+                IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT)
+            log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)");
+
+    }
+
+    /**
+     * Enables or disables failure detection timeout.
+     *
+     * @param enabled {@code true} if enable, {@code false} otherwise.
+     */
+    public void failureDetectionTimeoutEnabled(boolean enabled) {
+        failureDetectionTimeoutEnabled = enabled;
+    }
+
+    /**
+     * Checks whether failure detection timeout is enabled for this {@link IgniteSpi}.
+     *
+     * @return {@code true} if enabled, {@code false} otherwise.
+     */
+    public boolean failureDetectionTimeoutEnabled() {
+        return failureDetectionTimeoutEnabled;
+    }
+
+    /**
+     * Returns failure detection timeout set to use for network related operations.
+     *
+     * @return failure detection timeout in milliseconds or {@code 0} if the timeout is disabled.
+     */
+    public long failureDetectionTimeout() {
+        return failureDetectionTimeout;
+    }
+
+    /**
      * Temporarily SPI context.
      */
     private class GridDummySpiContext implements IgniteSpiContext {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
new file mode 100644
index 0000000..0e34cf2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ * Kind of exception that is used when failure detection timeout is enabled for {@link TcpDiscoverySpi} or
+ * {@link TcpCommunicationSpi}.
+ *
+ * For more information refer to {@link IgniteConfiguration#setFailureDetectionTimeout(long)} and
+ * {@link IgniteSpiOperationTimeoutHelper}.
+ */
+public class IgniteSpiOperationTimeoutException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Constructor.
+     * @param msg Error message.
+     */
+    public IgniteSpiOperationTimeoutException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
new file mode 100644
index 0000000..f7d8daa
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.net.*;
+
+/**
+ * Object that incorporates logic that determines a timeout value for the next network related operation and checks
+ * whether a failure detection timeout is reached or not.
+ *
+ * A new instance of the class should be created for every complex network based operations that usually consists of
+ * request and response parts.
+ */
+public class IgniteSpiOperationTimeoutHelper {
+    /** */
+    private long lastOperStartTs;
+
+    /** */
+    private long timeout;
+
+    /** */
+    private final boolean failureDetectionTimeoutEnabled;
+
+    /** */
+    private final long failureDetectionTimeout;
+
+    /**
+     * Constructor.
+     *
+     * @param adapter SPI adapter.
+     */
+    public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) {
+        failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled();
+        failureDetectionTimeout = adapter.failureDetectionTimeout();
+    }
+
+    /**
+     * Returns a timeout value to use for the next network operation.
+     *
+     * If failure detection timeout is enabled then the returned value is a portion of time left since the last time
+     * this method is called. If the timeout is disabled then {@code dfltTimeout} is returned.
+     *
+     * @param dfltTimeout Timeout to use if failure detection timeout is disabled.
+     * @return Timeout in milliseconds.
+     * @throws IgniteSpiOperationTimeoutException If failure detection timeout is reached for an operation that uses
+     * this {@code IgniteSpiOperationTimeoutController}.
+     */
+    public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
+        if (!failureDetectionTimeoutEnabled)
+            return dfltTimeout;
+
+        if (lastOperStartTs == 0) {
+            timeout = failureDetectionTimeout;
+            lastOperStartTs = U.currentTimeMillis();
+        }
+        else {
+            long curTs = U.currentTimeMillis();
+
+            timeout = timeout - (curTs - lastOperStartTs);
+
+            lastOperStartTs = curTs;
+
+            if (timeout <= 0)
+                throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+                    "'failureDetectionTimeout' configuration property [failureDetectionTimeout="
+                    + failureDetectionTimeout + ']');
+        }
+
+        return timeout;
+    }
+
+    /**
+     * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached.
+     *
+     * @param e Exception.
+     * @return {@code true} if failure detection timeout is reached, {@code false} otherwise.
+     */
+    public boolean checkFailureTimeoutReached(Exception e) {
+        if (!failureDetectionTimeoutEnabled)
+            return false;
+
+        return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
+            X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e9fd696..7be1dbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -73,7 +73,21 @@ import static org.apache.ignite.events.EventType.*;
  * {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use
  * {@link #setIdleConnectionTimeout(long)} configuration parameter to configure
  * you own idle connection timeout.
+ * <h1 class="header">Failure Detection</h1>
+ * Configuration defaults (see Configuration section below and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for
+ * communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection
+ * time worse.
  * <p>
+ * If it's needed to tune failure detection then it's highly recommended to do this using
+ * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
+ * following parameters: {@link #getConnectTimeout()}, {@link #getMaxConnectTimeout()},
+ * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
+ * ignored.
+ * <p>
+ * If it's required to perform advanced settings of failure detection and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpCommunicationSpi}
+ * configuration parameters may be used.
  * <h1 class="header">Configuration</h1>
  * <h2 class="header">Mandatory</h2>
  * This SPI has no mandatory configuration parameters.
@@ -991,12 +1005,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * {@code 0} is interpreted as infinite timeout.
      * <p>
      * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param connTimeout Connect timeout.
      */
     @IgniteSpiConfiguration(optional = true)
     public void setConnectTimeout(long connTimeout) {
         this.connTimeout = connTimeout;
+
+        failureDetectionTimeoutEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1013,12 +1031,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * {@code 0} is interpreted as infinite timeout.
      * <p>
      * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param maxConnTimeout Maximum connect timeout.
      */
     @IgniteSpiConfiguration(optional = true)
     public void setMaxConnectTimeout(long maxConnTimeout) {
         this.maxConnTimeout = maxConnTimeout;
+
+        failureDetectionTimeoutEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1031,12 +1053,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * with remote nodes.
      * <p>
      * If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
+     * <p>
+     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
      *
      * @param reconCnt Maximum number of reconnection attempts.
      */
     @IgniteSpiConfiguration(optional = true)
     public void setReconnectCount(int reconCnt) {
         this.reconCnt = reconCnt;
+
+        failureDetectionTimeoutEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1264,6 +1290,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
+        initFailureDetectionTimeout();
+
         assertParameter(locPort > 1023, "locPort > 1023");
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1272,10 +1300,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
         assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
-        assertParameter(reconCnt > 0, "reconnectCnt > 0");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
-        assertParameter(connTimeout >= 0, "connTimeout >= 0");
-        assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+
+        if (!failureDetectionTimeoutEnabled()) {
+            assertParameter(reconCnt > 0, "reconnectCnt > 0");
+            assertParameter(connTimeout >= 0, "connTimeout >= 0");
+            assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+        }
+
         assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
         assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
         assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
@@ -1351,9 +1383,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
             log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
-            log.debug(configInfo("connTimeout", connTimeout));
-            log.debug(configInfo("maxConnTimeout", maxConnTimeout));
-            log.debug(configInfo("reconCnt", reconCnt));
+
+            if (failureDetectionTimeoutEnabled()) {
+                log.debug(configInfo("connTimeout", connTimeout));
+                log.debug(configInfo("maxConnTimeout", maxConnTimeout));
+                log.debug(configInfo("reconCnt", reconCnt));
+            }
+            else
+                log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout()));
+
             log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
             log.debug(configInfo("ackSndThreshold", ackSndThreshold));
             log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
@@ -1906,17 +1944,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         long connTimeout0 = connTimeout;
 
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+
         while (true) {
             GridCommunicationClient client;
 
             try {
                 client = new GridShmemCommunicationClient(metricsLsnr,
                     port,
-                    connTimeout,
+                    timeoutHelper.nextTimeoutChunk(connTimeout),
                     log,
                     getSpiContext().messageFormatter());
             }
             catch (IgniteCheckedException e) {
+                if (timeoutHelper.checkFailureTimeoutReached(e))
+                    throw e;
+
                 // Reconnect for the second time, if connection is not established.
                 if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
                     connectAttempts++;
@@ -1928,15 +1971,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), connTimeout0);
+                safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
             }
-            catch (HandshakeTimeoutException e) {
+            catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+                client.forceClose();
+
+                if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+                    timeoutHelper.checkFailureTimeoutReached(e))) {
+                    log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+                        failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+
+                    throw e;
+                }
+
+                assert !failureDetectionTimeoutEnabled();
+
                 if (log.isDebugEnabled())
-                    log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                    log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                         ", err=" + e.getMessage() + ", client=" + client + ']');
 
-                client.forceClose();
-
                 if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
                     if (log.isDebugEnabled())
                         log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
@@ -2050,6 +2103,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             int attempt = 1;
 
+            IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+
             while (!conn) { // Reconnection on handshake timeout.
                 try {
                     SocketChannel ch = SocketChannel.open();
@@ -2076,9 +2131,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     long rcvCnt = -1;
 
                     try {
-                        ch.socket().connect(addr, (int)connTimeout);
+                        ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(),
+                            timeoutHelper.nextTimeoutChunk(connTimeout0));
 
                         if (rcvCnt == -1)
                             return null;
@@ -2112,19 +2168,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                     }
                 }
-                catch (HandshakeTimeoutException e) {
+                catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                     if (client != null) {
                         client.forceClose();
 
                         client = null;
                     }
 
+                    if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+                        timeoutHelper.checkFailureTimeoutReached(e))) {
+
+                        String msg = "Handshake timed out (failure detection timeout is reached) " +
+                            "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']';
+
+                        onException(msg, e);
+
+                        if (log.isDebugEnabled())
+                            log.debug(msg);
+
+                        if (errs == null)
+                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                                "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
+                                "in order to prevent parties from waiting forever in case of network issues " +
+                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+
+                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+
+                        break;
+                    }
+
+                    assert !failureDetectionTimeoutEnabled();
+
                     onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                         ", addr=" + addr + ']', e);
 
                     if (log.isDebugEnabled())
                         log.debug(
-                            "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                            "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                                 ", addr=" + addr + ", err=" + e + ']');
 
                     if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
@@ -2164,7 +2244,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
 
-                    if (X.hasCause(e, SocketTimeoutException.class))
+                    boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
+
+                    if (failureDetThrReached)
+                        LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
+                            "configuration property) [addr=" + addr + ", failureDetectionTimeout=" +
+                            failureDetectionTimeout() + ']');
+                    else if (X.hasCause(e, SocketTimeoutException.class))
                         LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
                             "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
 
@@ -2177,7 +2263,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
 
                     // Reconnect for the second time, if connection is not established.
-                    if (connectAttempts < 2 &&
+                    if (!failureDetThrReached && connectAttempts < 2 &&
                         (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
                         connectAttempts++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 572ba2c..12b10b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -480,13 +480,17 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         Collection<Throwable> errs = null;
 
-        long ackTimeout0 = spi.ackTimeout;
+        long ackTimeout0 = spi.getAckTimeout();
+
+        int reconCnt = 0;
 
         int connectAttempts = 1;
 
         UUID locNodeId = getLocalNodeId();
 
-        for (int i = 0; i < spi.reconCnt; i++) {
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+        while (true) {
             boolean openSock = false;
 
             Socket sock = null;
@@ -494,7 +498,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr);
+                sock = spi.openSocket(addr, timeoutHelper);
 
                 openSock = true;
 
@@ -502,7 +506,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 req.client(true);
 
-                spi.writeToSocket(sock, req);
+                spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
 
@@ -532,7 +536,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 msg.client(true);
 
-                spi.writeToSocket(sock, msg);
+                spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -540,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
                         ", rmtNodeId=" + rmtNodeId + ']');
 
-                return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck());
+                return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
+                    res.clientAck());
             }
             catch (IOException | IgniteCheckedException e) {
                 U.closeQuiet(sock);
@@ -555,6 +560,12 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (timeoutHelper.checkFailureTimeoutReached(e))
+                    break;
+
+                if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+                    break;
+
                 if (!openSock) {
                     // Reconnect for the second time, if connection is not established.
                     if (connectAttempts < 2) {
@@ -566,7 +577,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     break; // Don't retry if we can not establish connection.
                 }
 
-                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException ||
+                    X.hasCause(e, SocketTimeoutException.class))) {
                     ackTimeout0 *= 2;
 
                     if (!checkAckTimeout(ackTimeout0))
@@ -868,6 +880,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
         /** */
+        private final long socketTimeout;
+
+        /** */
         private TcpDiscoveryAbstractMessage unackedMsg;
 
         /**
@@ -875,6 +890,9 @@ class ClientImpl extends TcpDiscoveryImpl {
          */
         protected SocketWriter() {
             super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
+
+            socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+                spi.getSocketTimeout();
         }
 
         /**
@@ -968,12 +986,13 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    spi.writeToSocket(sock, msg);
+                    spi.writeToSocket(sock, msg, socketTimeout);
 
                     msg = null;
 
                     if (ack) {
-                        long waitEnd = U.currentTimeMillis() + spi.ackTimeout;
+                        long waitEnd = U.currentTimeMillis() + (spi.failureDetectionTimeoutEnabled() ?
+                            spi.failureDetectionTimeout() : spi.getAckTimeout());
 
                         TcpDiscoveryAbstractMessage unacked;
 
@@ -989,7 +1008,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (unacked != null) {
                             if (log.isDebugEnabled())
                                 log.debug("Failed to get acknowledge for message, will try to reconnect " +
-                                "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']');
+                                "[msg=" + unacked +
+                                (spi.failureDetectionTimeoutEnabled() ?
+                                ", failureDetectionTimeout=" + spi.failureDetectionTimeout() :
+                                ", timeout=" + spi.getAckTimeout()) + ']');
 
                             throw new IOException("Failed to get acknowledge for message: " + unacked);
                         }
@@ -1068,11 +1090,11 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (join) {
                             joinError(new IgniteSpiException("Join process timed out, connection failed and " +
                                 "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
-                                "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
                         }
                         else
-                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
-                                "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" +
+                                " configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
 
                         return;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index dc343eb..b4f89ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -80,14 +80,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Client message workers. */
     protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
 
-    /** Metrics sender. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private HeartbeatsSender hbsSnd;
-
-    /** Status checker. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private CheckStatusSender chkStatusSnd;
-
     /** IP finder cleaner. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IpFinderCleaner ipFinderCleaner;
@@ -229,12 +221,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         spi.stats.onJoinFinished();
 
-        hbsSnd = new HeartbeatsSender();
-        hbsSnd.start();
-
-        chkStatusSnd = new CheckStatusSender();
-        chkStatusSnd.start();
-
         if (spi.ipFinder.isShared()) {
             ipFinderCleaner = new IpFinderCleaner();
             ipFinderCleaner.start();
@@ -278,10 +264,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
 
             synchronized (mux) {
-                long threshold = U.currentTimeMillis() + spi.netTimeout;
-
                 long timeout = spi.netTimeout;
 
+                long threshold = U.currentTimeMillis() + timeout;
+
                 while (spiState != LEFT && timeout > 0) {
                     try {
                         mux.wait(timeout);
@@ -319,12 +305,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(tmp);
         U.joinThreads(tmp, log);
 
-        U.interrupt(hbsSnd);
-        U.join(hbsSnd, log);
-
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
-
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
 
@@ -482,6 +462,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         UUID locNodeId = getLocalNodeId();
 
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
         if (F.contains(spi.locNodeAddrs, addr)) {
             if (clientNodeId == null)
                 return F.t(getLocalNodeId(), false);
@@ -494,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             boolean clientPingRes;
 
             try {
-                clientPingRes = clientWorker.ping();
+                clientPingRes = clientWorker.ping(timeoutHelper);
             }
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -517,18 +499,26 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 Socket sock = null;
 
-                for (int i = 0; i < spi.reconCnt; i++) {
+                int reconCnt = 0;
+
+                boolean openedSock = false;
+
+                while (true) {
                     try {
                         if (addr.isUnresolved())
                             addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
 
                         long tstamp = U.currentTimeMillis();
 
-                        sock = spi.openSocket(addr);
+                        sock = spi.openSocket(addr, timeoutHelper);
 
-                        spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+                        openedSock = true;
 
-                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout);
+                        spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+                            timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+
+                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
+                            spi.getAckTimeout()));
 
                         if (locNodeId.equals(res.creatorNodeId())) {
                             if (log.isDebugEnabled())
@@ -550,6 +540,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                             errs = new ArrayList<>();
 
                         errs.add(e);
+
+                        reconCnt++;
+
+                        if (!openedSock && reconCnt == 2)
+                            break;
+
+                        if (timeoutHelper.checkFailureTimeoutReached(e))
+                            break;
+                        else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount())
+                            break;
                     }
                     finally {
                         U.closeQuiet(sock);
@@ -607,6 +607,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected void onDataReceived() {
+        if (spi.failureDetectionTimeoutEnabled() && locNode != null)
+            locNode.lastDataReceivedTime(U.currentTimeMillis());
+    }
+
     /**
      * Tries to join this node to topology.
      *
@@ -678,10 +684,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Join request message has been sent (waiting for coordinator response).");
 
             synchronized (mux) {
-                long threshold = U.currentTimeMillis() + spi.netTimeout;
-
                 long timeout = spi.netTimeout;
 
+                long threshold = U.currentTimeMillis() + timeout;
+
                 while (spiState == CONNECTING && timeout > 0) {
                     try {
                         mux.wait(timeout);
@@ -883,15 +889,19 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         Collection<Throwable> errs = null;
 
-        long ackTimeout0 = spi.ackTimeout;
+        long ackTimeout0 = spi.getAckTimeout();
 
         int connectAttempts = 1;
 
-        boolean joinReqSent = false;
+        boolean joinReqSent;
 
         UUID locNodeId = getLocalNodeId();
 
-        for (int i = 0; i < spi.reconCnt; i++) {
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+        int reconCnt = 0;
+
+        while (true){
             // Need to set to false on each new iteration,
             // since remote node may leave in the middle of the first iteration.
             joinReqSent = false;
@@ -903,14 +913,16 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr);
+                sock = spi.openSocket(addr, timeoutHelper);
 
                 openSock = true;
 
                 // Handshake.
-                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
+                    spi.getSocketTimeout()));
 
-                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
+                    ackTimeout0));
 
                 if (locNodeId.equals(res.creatorNodeId())) {
                     if (log.isDebugEnabled())
@@ -924,7 +936,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Send message.
                 tstamp = U.currentTimeMillis();
 
-                spi.writeToSocket(sock, msg);
+                spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -941,7 +953,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // E.g. due to class not found issue.
                 joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
 
-                return spi.readReceipt(sock, ackTimeout0);
+                return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
             }
             catch (ClassCastException e) {
                 // This issue is rarely reproducible on AmazonEC2, but never
@@ -967,6 +979,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (timeoutHelper.checkFailureTimeoutReached(e))
+                    break;
+
+                if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+                    break;
+
                 if (!openSock) {
                     // Reconnect for the second time, if connection is not established.
                     if (connectAttempts < 2) {
@@ -978,7 +996,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     break; // Don't retry if we can not establish connection.
                 }
 
-                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException ||
+                    X.hasCause(e, SocketTimeoutException.class))) {
                     ackTimeout0 *= 2;
 
                     if (!checkAckTimeout(ackTimeout0))
@@ -1256,12 +1275,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(tcpSrvr);
         U.join(tcpSrvr, log);
 
-        U.interrupt(hbsSnd);
-        U.join(hbsSnd, log);
-
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
-
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
 
@@ -1350,8 +1363,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             b.append("Internal threads: ").append(U.nl());
 
             b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
-            b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
-            b.append("    HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
+
             b.append("    IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
             b.append("    Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
 
@@ -1398,7 +1410,8 @@ class ServerImpl extends TcpDiscoveryImpl {
     private boolean recordable(TcpDiscoveryAbstractMessage msg) {
         return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
             !(msg instanceof TcpDiscoveryStatusCheckMessage) &&
-            !(msg instanceof TcpDiscoveryDiscardMessage);
+            !(msg instanceof TcpDiscoveryDiscardMessage) &&
+            !(msg instanceof TcpDiscoveryConnectionCheckMessage);
     }
 
     /**
@@ -1434,112 +1447,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     * Thread that sends heartbeats.
-     */
-    private class HeartbeatsSender extends IgniteSpiThread {
-        /**
-         * Constructor.
-         */
-        private HeartbeatsSender() {
-            super(spi.ignite().name(), "tcp-disco-hb-sender", log);
-
-            setPriority(spi.threadPri);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("BusyWait")
-        @Override protected void body() throws InterruptedException {
-            while (!isLocalNodeCoordinator())
-                Thread.sleep(1000);
-
-            if (log.isDebugEnabled())
-                log.debug("Heartbeats sender has been started.");
-
-            UUID nodeId = getConfiguredNodeId();
-
-            while (!isInterrupted()) {
-                if (spiStateCopy() != CONNECTED) {
-                    if (log.isDebugEnabled())
-                        log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
-
-                    return;
-                }
-
-                TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId);
-
-                msg.verify(getLocalNodeId());
-
-                msgWorker.addMessage(msg);
-
-                Thread.sleep(spi.hbFreq);
-            }
-        }
-    }
-
-    /**
-     * Thread that sends status check messages to next node if local node has not
-     * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
-     * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
-     * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
-     */
-    private class CheckStatusSender extends IgniteSpiThread {
-        /**
-         * Constructor.
-         */
-        private CheckStatusSender() {
-            super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
-
-            setPriority(spi.threadPri);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("BusyWait")
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Status check sender has been started.");
-
-            // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
-            long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
-
-            long lastSent = 0;
-
-            while (!isInterrupted()) {
-                // 1. Determine timeout.
-                if (lastSent < locNode.lastUpdateTime())
-                    lastSent = locNode.lastUpdateTime();
-
-                long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
-
-                if (timeout > 0)
-                    Thread.sleep(timeout);
-
-                // 2. Check if SPI is still connected.
-                if (spiStateCopy() != CONNECTED) {
-                    if (log.isDebugEnabled())
-                        log.debug("Stopping status check sender (SPI is not connected to topology).");
-
-                    return;
-                }
-
-                // 3. Was there an update?
-                if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping status check send " +
-                            "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
-                            ", hasRmts=" + ring.hasRemoteNodes() + ']');
-
-                    continue;
-                }
-
-                // 4. Send status check message.
-                lastSent = U.currentTimeMillis();
-
-                msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
-            }
-        }
-    }
-
-    /**
      * Thread that cleans IP finder and keeps it in the correct state, unregistering
      * addresses of the nodes that has left the topology.
      * <p>
@@ -1861,10 +1768,49 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private Socket sock;
 
+        /** Last time status message has been sent. */
+        private long lastTimeStatusMsgSent;
+
+        /** Incoming heartbeats check frequency. */
+        private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+
+        /** Last time heartbeat message has been sent. */
+        private long lastTimeHbMsgSent;
+
+        /** Time when the last status message has been sent. */
+        private long lastTimeConnCheckMsgSent;
+
+        /** Flag that keeps info on whether the threshold is reached or not. */
+        private boolean failureThresholdReached;
+
+        /** Connection check frequency. */
+        private long connCheckFreq;
+
         /**
          */
         protected RingMessageWorker() {
-            super("tcp-disco-msg-worker");
+            super("tcp-disco-msg-worker", 10);
+
+            initConnectionCheckFrequency();
+        }
+
+        /**
+         * Initializes connection check frequency. Used only when failure detection timeout is enabled.
+         */
+        private void initConnectionCheckFrequency() {
+            if (spi.failureDetectionTimeoutEnabled()) {
+                for (int i = 3; i > 0; i--) {
+                    connCheckFreq = spi.failureDetectionTimeout() / i;
+
+                    if (connCheckFreq > 0)
+                        break;
+                }
+
+                assert connCheckFreq > 0;
+
+                if (log.isDebugEnabled())
+                    log.debug("Connection check frequency is calculated: " + connCheckFreq);
+            }
         }
 
         /**
@@ -1921,9 +1867,25 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (spi.ensured(msg))
                 msgHist.add(msg);
 
+            if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId()))
+                // Reset the flag.
+                failureThresholdReached = false;
+
             spi.stats.onMessageProcessingFinished(msg);
         }
 
+        /** {@inheritDoc} */
+        @Override protected void noMessageLoop() {
+            if (locNode == null)
+                return;
+
+            checkConnection();
+
+            sendHeartbeatMessage();
+
+            checkHeartbeatsReceiving();
+        }
+
         /**
          * Sends message across the ring.
          *
@@ -1990,7 +1952,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (debugMode)
                             debugLog("No next node in topology.");
 
-                        if (ring.hasRemoteNodes()) {
+                        if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
+                            !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
                             msg.senderNodeId(locNodeId);
 
                             addMessage(msg);
@@ -2027,7 +1990,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
 
                 addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) {
-                    long ackTimeout0 = spi.ackTimeout;
+                    long ackTimeout0 = spi.getAckTimeout();
 
                     if (locNodeAddrs.contains(addr)){
                         if (log.isDebugEnabled())
@@ -2037,8 +2000,15 @@ class ServerImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    for (int i = 0; i < spi.reconCnt; i++) {
+                    int reconCnt = 0;
+
+                    IgniteSpiOperationTimeoutHelper timeoutHelper = null;
+
+                    while (true) {
                         if (sock == null) {
+                            if (timeoutHelper == null)
+                                timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
                             nextNodeExists = false;
 
                             boolean success = false;
@@ -2049,14 +2019,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                sock = spi.openSocket(addr);
+                                sock = spi.openSocket(addr, timeoutHelper);
 
                                 openSock = true;
 
                                 // Handshake.
-                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
+                                    timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+                                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
+                                    timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                 if (locNodeId.equals(res.creatorNodeId())) {
                                     if (log.isDebugEnabled())
@@ -2140,8 +2112,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (!openSock)
                                     break; // Don't retry if we can not establish connection.
 
-                                if (e instanceof SocketTimeoutException ||
-                                    X.hasCause(e, SocketTimeoutException.class)) {
+                                if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+                                    break;
+
+                                if (timeoutHelper.checkFailureTimeoutReached(e))
+                                    break;
+                                else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
+                                    SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
                                     ackTimeout0 *= 2;
 
                                     if (!checkAckTimeout(ackTimeout0))
@@ -2156,9 +2133,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     sock = null;
                                 }
-                                else
+                                else {
                                     // Next node exists and accepts incoming messages.
                                     nextNodeExists = true;
+                                    // Resetting timeout control object to let the code below to use a new one
+                                    // for the next bunch of operations.
+                                    timeoutHelper = null;
+                                }
                             }
                         }
 
@@ -2195,8 +2176,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
                                         pendingMsgs.discardId);
 
+                                    if (timeoutHelper == null)
+                                        timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
                                     try {
-                                        writeToSocket(sock, pendingMsg);
+                                        writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk(
+                                            spi.getSocketTimeout()));
                                     }
                                     finally {
                                         clearNodeAddedMessage(pendingMsg);
@@ -2204,7 +2189,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
 
-                                    int res = spi.readReceipt(sock, ackTimeout0);
+                                    int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                     if (log.isDebugEnabled())
                                         log.debug("Pending message has been sent to next node [msg=" + msg.id() +
@@ -2215,19 +2200,34 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         debugLog("Pending message has been sent to next node [msg=" + msg.id() +
                                             ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
                                             ", res=" + res + ']');
+
+                                    // Resetting timeout control object to create a new one for the next bunch of
+                                    // operations.
+                                    timeoutHelper = null;
                                 }
                             }
 
-                            prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+                            if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+                                if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+                                    TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
+                                    TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER))
+                                    // Preserve backward compatibility with nodes of older versions.
+                                    msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
+                            }
+                            else
+                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                writeToSocket(sock, msg);
+                                if (timeoutHelper == null)
+                                    timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+                                writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
-                                int res = spi.readReceipt(sock, ackTimeout0);
+                                int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                 if (log.isDebugEnabled())
                                     log.debug("Message has been sent to next node [msg=" + msg +
@@ -2262,11 +2262,19 @@ class ServerImpl extends TcpDiscoveryImpl {
                             onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
                                 e);
 
-                            if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
-                                ackTimeout0 *= 2;
+                            if (timeoutHelper.checkFailureTimeoutReached(e))
+                                break;
 
-                                if (!checkAckTimeout(ackTimeout0))
+                            if (!spi.failureDetectionTimeoutEnabled()) {
+                                if (++reconCnt == spi.getReconnectCount())
                                     break;
+                                else if (e instanceof SocketTimeoutException ||
+                                    X.hasCause(e, SocketTimeoutException.class)) {
+                                    ackTimeout0 *= 2;
+
+                                    if (!checkAckTimeout(ackTimeout0))
+                                        break;
+                                }
                             }
                         }
                         finally {
@@ -2279,7 +2287,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 if (log.isDebugEnabled())
                                     log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
-                                        ", i=" + i + ']');
+                                        (!spi.failureDetectionTimeoutEnabled() ? ", i=" + reconCnt : "") + ']');
                             }
                         }
                     } // Try to reconnect.
@@ -3342,7 +3350,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
                 else if (leftNode.equals(next) && sock != null) {
                     try {
-                        writeToSocket(sock, msg);
+                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                            spi.failureDetectionTimeout() : spi.getSocketTimeout());
 
                         if (log.isDebugEnabled())
                             log.debug("Sent verified node left message to leaving node: " + msg);
@@ -3991,6 +4000,77 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
         }
+
+        /**
+         * Sends heartbeat message if needed.
+         */
+        private void sendHeartbeatMessage() {
+            if (!isLocalNodeCoordinator())
+                return;
+
+            long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
+
+            msg.verify(getLocalNodeId());
+
+            msgWorker.addMessage(msg);
+
+            lastTimeHbMsgSent = U.currentTimeMillis();
+        }
+
+        /**
+         * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
+         * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring.
+         */
+        private void checkHeartbeatsReceiving() {
+            if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
+                lastTimeStatusMsgSent = locNode.lastUpdateTime();
+
+            long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+
+            lastTimeStatusMsgSent = U.currentTimeMillis();
+        }
+
+        /**
+         * Check connection aliveness status.
+         */
+        private void checkConnection() {
+            if (!spi.failureDetectionTimeoutEnabled())
+                return;
+
+            if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime()
+                >= spi.failureDetectionTimeout() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+
+                log.info("Local node seems to be disconnected from topology (failure detection timeout " +
+                    "is reached): [failureDetectionTimeout=" + spi.failureDetectionTimeout() +
+                    ", connCheckFreq=" + connCheckFreq + ']');
+
+                failureThresholdReached = true;
+
+                // Reset sent time deliberately to force sending connection check message.
+                lastTimeConnCheckMsgSent = 0;
+            }
+
+            long elapsed = (lastTimeConnCheckMsgSent + connCheckFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            if (ring.hasRemoteNodes()) {
+                sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode));
+
+                lastTimeConnCheckMsgSent = U.currentTimeMillis();
+            }
+        }
     }
 
     /**
@@ -4186,14 +4266,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
 
+                            IgniteSpiOperationTimeoutHelper timeoutHelper =
+                                new IgniteSpiOperationTimeoutHelper(spi);
+
                             if (req.clientNodeId() != null) {
                                 ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
 
                                 if (clientWorker != null)
-                                    res.clientExists(clientWorker.ping());
+                                    res.clientExists(clientWorker.ping(timeoutHelper));
                             }
 
-                            spi.writeToSocket(sock, res);
+                            spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
                         }
                         else if (log.isDebugEnabled())
                             log.debug("Ignore ping request, node is stopping.");
@@ -4214,7 +4297,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (req.client())
                         res.clientAck(true);
 
-                    spi.writeToSocket(sock, res);
+                    spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ?
+                        spi.failureDetectionTimeout() : spi.getSocketTimeout());
 
                     // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
                     // the local node sends a handshake request message on the loopback address, so we get here.
@@ -4323,6 +4407,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+                    spi.getSocketTimeout();
+
                 while (!isInterrupted()) {
                     try {
                         TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
@@ -4337,7 +4424,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (debugMode && recordable(msg))
                             debugLog("Message has been received: " + msg);
 
-                        if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                        if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+
+                            continue;
+                        }
+                        else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                             TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
 
                             if (!req.responded()) {
@@ -4355,7 +4447,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 TcpDiscoverySpiState state = spiStateCopy();
 
                                 if (state == CONNECTED) {
-                                    spi.writeToSocket(msg, sock, RES_OK);
+                                    spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                                     if (clientMsgWrk.getState() == State.NEW)
                                         clientMsgWrk.start();
@@ -4365,7 +4457,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     continue;
                                 }
                                 else {
-                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN);
+                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout);
 
                                     break;
                                 }
@@ -4373,7 +4465,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4402,7 +4494,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4431,7 +4523,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4460,7 +4552,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4509,7 +4601,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             clientMsgWrk.addMessage(ack);
                         }
                         else
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
                     }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
@@ -4610,8 +4702,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             TcpDiscoverySpiState state = spiStateCopy();
 
+            long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+                spi.getSocketTimeout();
+
             if (state == CONNECTED) {
-                spi.writeToSocket(msg, sock, RES_OK);
+                spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -4648,7 +4743,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Local node is stopping. Remote node should try next one.
                     res = RES_CONTINUE_JOIN;
 
-                spi.writeToSocket(msg, sock, res);
+                spi.writeToSocket(msg, sock, res, socketTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
@@ -4741,7 +4836,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param clientNodeId Node ID.
          */
         protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
-            super("tcp-disco-client-message-worker");
+            super("tcp-disco-client-message-worker", 2000);
 
             this.sock = sock;
             this.clientNodeId = clientNodeId;
@@ -4791,7 +4886,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 
-                        writeToSocket(sock, msg);
+                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                            spi.failureDetectionTimeout() : spi.getSocketTimeout());
                     }
                 }
                 else {
@@ -4802,7 +4898,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         prepareNodeAddedMessage(msg, clientNodeId, null, null);
 
-                        writeToSocket(sock, msg);
+                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                            spi.failureDetectionTimeout() : spi.getSocketTimeout());
                     }
                     finally {
                         clearNodeAddedMessage(msg);
@@ -4836,10 +4933,11 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param timeoutHelper Timeout controller.
          * @return Ping result.
          * @throws InterruptedException If interrupted.
          */
-        public boolean ping() throws InterruptedException {
+        public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
             if (spi.isNodeStopping0())
                 return false;
 
@@ -4865,7 +4963,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             try {
-                return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS);
+                return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()),
+                    TimeUnit.MILLISECONDS);
             }
             catch (IgniteInterruptedCheckedException ignored) {
                 throw new InterruptedException();
@@ -4904,12 +5003,18 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Backed interrupted flag. */
         private volatile boolean interrupted;
 
+        /** Polling timeout. */
+        private final long pollingTimeout;
+
         /**
          * @param name Thread name.
+         * @param pollingTimeout Messages polling timeout.
          */
-        protected MessageWorkerAdapter(String name) {
+        protected MessageWorkerAdapter(String name, long pollingTimeout) {
             super(spi.ignite().name(), name, log);
 
+            this.pollingTimeout = pollingTimeout;
+
             setPriority(spi.threadPri);
         }
 
@@ -4919,12 +5024,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
 
             while (!isInterrupted()) {
-                TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
+                TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
 
                 if (msg == null)
-                    continue;
-
-                processMessage(msg);
+                    noMessageLoop();
+                else
+                    processMessage(msg);
             }
         }
 
@@ -4968,16 +5073,24 @@ class ServerImpl extends TcpDiscoveryImpl {
         protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
 
         /**
+         * Called when there is no message to process giving ability to perform other activity.
+         */
+        protected void noMessageLoop() {
+            // No-op.
+        }
+
+        /**
          * @param sock Socket.
          * @param msg Message.
+         * @param timeout Socket timeout.
          * @throws IOException If IO failed.
          * @throws IgniteCheckedException If marshalling failed.
          */
-        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
             throws IOException, IgniteCheckedException {
             bout.reset();
 
-            spi.writeToSocket(sock, msg, bout);
+            spi.writeToSocket(sock, msg, bout, timeout);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index c271b7c..4dacf45 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -131,6 +131,13 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
+     * Called when a chunk of data is received from a remote node.
+     */
+    protected void onDataReceived() {
+        // No-op
+    }
+
+    /**
      * @param log Logger.
      */
     public abstract void dumpDebugInfo(IgniteLogger log);
@@ -273,10 +280,10 @@ abstract class TcpDiscoveryImpl {
      * maximum acknowledgement timeout, {@code false} otherwise.
      */
     protected boolean checkAckTimeout(long ackTimeout) {
-        if (ackTimeout > spi.maxAckTimeout) {
+        if (ackTimeout > spi.getMaxAckTimeout()) {
             LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
                 "(consider increasing 'maxAckTimeout' configuration property) " +
-                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
+                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']');
 
             return false;
         }