You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/15 16:12:02 UTC

[1/3] incubator-ignite git commit: ignite-752: added manualFailureDetection setup flag

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-752 de7e06f3f -> 1b6005a50


ignite-752: added manualFailureDetection setup flag


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1a7421e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1a7421e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1a7421e6

Branch: refs/heads/ignite-752
Commit: 1a7421e67b9fc743c2213e6381f64c9e646030f2
Parents: de7e06f
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 15 09:15:32 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 15 09:15:32 2015 +0300

----------------------------------------------------------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a7421e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 7663fe6..4240d6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -221,17 +221,17 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     protected TcpDiscoveryIpFinder ipFinder;
 
     /** Socket operations timeout. */
-    protected long sockTimeout; // Must be initialized in the constructor of child class.
+    private long sockTimeout; // Must be initialized in the constructor of child class.
 
     /** Message acknowledgement timeout. */
-    protected long ackTimeout; // Must be initialized in the constructor of child class.
+    private long ackTimeout; // Must be initialized in the constructor of child class.
 
     /** Network timeout. */
-    protected long netTimeout = DFLT_NETWORK_TIMEOUT;
+    private long netTimeout = DFLT_NETWORK_TIMEOUT;
 
     /** Join timeout. */
     @SuppressWarnings("RedundantFieldInitialization")
-    protected long joinTimeout = DFLT_JOIN_TIMEOUT;
+    private long joinTimeout = DFLT_JOIN_TIMEOUT;
 
     /** Thread priority for all threads started by SPI. */
     protected int threadPri = DFLT_THREAD_PRI;
@@ -283,14 +283,14 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** Reconnect attempts count. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    protected int reconCnt = DFLT_RECONNECT_CNT;
+    private int reconCnt = DFLT_RECONNECT_CNT;
 
     /** Statistics print frequency. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
     protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
 
     /** Maximum message acknowledgement timeout. */
-    protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
+    private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
 
     /** Max heartbeats count node can miss without initiating status check. */
     protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
@@ -327,6 +327,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** */
     private boolean forceSrvMode;
 
+    /** User manually set one of the failure detection timeouts. Failure detection threshold will not be used. */
+    private boolean manualFailureDetectionSetup;
+
     /** {@inheritDoc} */
     @Override public String getSpiState() {
         return impl.getSpiState();
@@ -499,6 +502,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setReconnectCount(int reconCnt) {
         this.reconCnt = reconCnt;
+        manualFailureDetectionSetup = true;
 
         return this;
     }
@@ -525,6 +529,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) {
         this.maxAckTimeout = maxAckTimeout;
+        manualFailureDetectionSetup = true;
 
         return this;
     }
@@ -697,6 +702,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setSocketTimeout(long sockTimeout) {
         this.sockTimeout = sockTimeout;
+        manualFailureDetectionSetup = true;
 
         return this;
     }
@@ -714,6 +720,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setAckTimeout(long ackTimeout) {
         this.ackTimeout = ackTimeout;
+        manualFailureDetectionSetup = true;
 
         return this;
     }
@@ -728,6 +735,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setNetworkTimeout(long netTimeout) {
         this.netTimeout = netTimeout;
+        manualFailureDetectionSetup = true;
 
         return this;
     }


[3/3] incubator-ignite git commit: ignite-752: keep reusing failure detection threshold

Posted by sb...@apache.org.
ignite-752: keep reusing failure detection threshold


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1b6005a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1b6005a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1b6005a5

Branch: refs/heads/ignite-752
Commit: 1b6005a50584045cc11633541bc7b023ee399b32
Parents: 567aec1
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 15 17:11:45 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 15 17:11:45 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  61 -----------
 .../IgniteSpiOperationTimeoutController.java    |  93 ++++++++++++++++
 .../spi/IgniteSpiOperationTimeoutException.java |  33 ++++++
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 106 +++++++++++--------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   4 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  16 ++-
 6 files changed, 196 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 82ed3d0..422ce81 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -31,12 +31,10 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.resources.*;
-
 import org.jetbrains.annotations.*;
 
 import javax.management.*;
 import java.io.*;
-import java.net.*;
 import java.text.*;
 import java.util.*;
 
@@ -581,51 +579,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
     /**
      * TODO: IGNITE-752
-     * @param dfltTimeout
-     * @return
-     */
-    public long firstNetOperationTimeout(long dfltTimeout) {
-        return !failureDetectionThresholdEnabled ? dfltTimeout : failureDetectionThreshold;
-    }
-
-    /**
-     * TODO: IGNITE-752
-     * @param curTimeout
-     * @param lastOperStartTime
-     * @param dfltTimeout
-     * @return
-     * @throws IOException
-     */
-    public long nextNetOperationTimeout(long curTimeout, long lastOperStartTime, long dfltTimeout)
-        throws NetOperationTimeoutException {
-        if (!failureDetectionThresholdEnabled)
-            return dfltTimeout;
-
-        long timeLeft = curTimeout - lastOperStartTime;
-
-        if (timeLeft <= 0)
-            throw new NetOperationTimeoutException("Network operation timed out. Increase failure detection threshold" +
-                " using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts manually." +
-                " Current failure detection threshold: " + failureDetectionThreshold);
-
-        return timeLeft;
-    }
-
-    /**
-     * TODO: IGNITE-752
-     * @param e
-     * @return
-     */
-    public boolean checkFailureDetectionThresholdReached(Exception e) {
-        if (!failureDetectionThresholdEnabled)
-            return false;
-
-        return e instanceof NetOperationTimeoutException || e instanceof SocketTimeoutException ||
-            X.hasCause(e, NetOperationTimeoutException.class, SocketException.class);
-    }
-
-    /**
-     * TODO: IGNITE-752
      * @param enabled
      */
     public void failureDetectionThresholdEnabled(boolean enabled) {
@@ -648,20 +601,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         return failureDetectionThreshold;
     }
 
-
-    /**
-     * TODO: IGNITE-752
-     */
-    public static class NetOperationTimeoutException extends IgniteCheckedException {
-        /**
-         * Constructor.
-         * @param msg Error message.
-         */
-        public NetOperationTimeoutException(String msg) {
-            super(msg);
-        }
-    }
-
     /**
      * Temporarily SPI context.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
new file mode 100644
index 0000000..3ae4fa4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * TODO: IGNITE-752
+ */
+public class IgniteSpiOperationTimeoutController {
+    /** */
+    private long lastOperStartTs;
+
+    /** */
+    private long timeout;
+
+    /** */
+    private final boolean failureDetectionThresholdEnabled;
+
+    /** */
+    private final long failureDetectionThreshold;
+
+    /**
+     * Constructor.
+     *
+     * @param adapter SPI adapter.
+     */
+    public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) {
+        failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
+        failureDetectionThreshold = adapter.failureDetectionThreshold();
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @param dfltTimeout
+     * @return
+     * @throws IgniteSpiOperationTimeoutException
+     */
+    public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
+        if (!failureDetectionThresholdEnabled)
+            return dfltTimeout;
+
+        if (lastOperStartTs == 0) {
+            timeout = failureDetectionThreshold;
+            lastOperStartTs = U.currentTimeMillis();
+        }
+        else {
+            long curTs = U.currentTimeMillis();
+
+            timeout = timeout - (curTs - lastOperStartTs);
+
+            lastOperStartTs = curTs;
+
+            if (timeout <= 0)
+                throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase failure detection" +
+                    " threshold using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts" +
+                    " manually. Current failure detection threshold: " + failureDetectionThreshold);
+        }
+
+        return timeout;
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @param e
+     * @return
+     */
+    public boolean checkFailureDetectionThresholdReached(Exception e) {
+        if (!failureDetectionThresholdEnabled)
+            return false;
+
+        return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
+            X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
new file mode 100644
index 0000000..c90b45b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.*;
+
+/**
+ * TODO: IGNITE-752
+ */
+public class IgniteSpiOperationTimeoutException extends IgniteCheckedException {
+    /**
+     * Constructor.
+     * @param msg Error message.
+     */
+    public IgniteSpiOperationTimeoutException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9dd565c..d506507 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -50,7 +50,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 import static org.apache.ignite.spi.IgnitePortProtocol.*;
 import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
 import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
-import static org.apache.ignite.spi.IgniteSpiAdapter.NetOperationTimeoutException;
 
 /**
  *
@@ -279,9 +278,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
 
             synchronized (mux) {
-                long threshold = U.currentTimeMillis() + spi.netTimeout;
+                long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                    spi.getNetworkTimeout();
 
-                long timeout = spi.netTimeout;
+                long threshold = U.currentTimeMillis() + timeout;
 
                 while (spiState != LEFT && timeout > 0) {
                     try {
@@ -509,8 +509,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 Socket sock = null;
 
-                long timeout = 0;
-                long lastOperStartTs = 0;
+                IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
 
                 int reconCnt = 0;
 
@@ -519,22 +518,15 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (addr.isUnresolved())
                             addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
 
-                        timeout = lastOperStartTs == 0 ? spi.firstNetOperationTimeout(spi.getSocketTimeout()) :
-                            spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout());
+                        long tstamp = U.currentTimeMillis();
 
-                        long tstamp = lastOperStartTs = U.currentTimeMillis();
+                        sock = spi.openSocket(addr, timeoutCtrl);
 
-                        sock = spi.openSocket(addr, timeout);
+                        spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+                            timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                        timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout());
-                        lastOperStartTs = U.currentTimeMillis();
-
-                        spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), timeout);
-
-                        timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getNetworkTimeout());
-                        lastOperStartTs = U.currentTimeMillis();
-
-                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeout);
+                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
+                            spi.getNetworkTimeout()));
 
                         if (locNodeId.equals(res.creatorNodeId())) {
                             if (log.isDebugEnabled())
@@ -557,7 +549,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         errs.add(e);
 
-                        if (spi.checkFailureDetectionThresholdReached(e))
+                        if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
                             break;
                         else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
                             break;
@@ -690,9 +682,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Join request message has been sent (waiting for coordinator response).");
 
             synchronized (mux) {
-                long threshold = U.currentTimeMillis() + spi.netTimeout;
+                long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                    spi.getNetworkTimeout();
 
-                long timeout = spi.netTimeout;
+                long threshold = U.currentTimeMillis() + timeout;
 
                 while (spiState == CONNECTING && timeout > 0) {
                     try {
@@ -734,8 +727,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
                         "Check remote nodes logs for possible error messages. " +
                         "Note that large topology may require significant time to start. " +
-                        "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
-                        "if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']');
+                        "Increase 'IgniteConfiguration.failureDetectionThreshold' configuration property " +
+                        "if getting this message on the starting nodes [failureDetectionThreshold=" +
+                        spi.failureDetectionThreshold() + ']');
             }
         }
 
@@ -855,10 +849,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                         "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
                         addrs);
 
-                if (spi.joinTimeout > 0) {
+                if (spi.getJoinTimeout() > 0) {
                     if (noResStart == 0)
                         noResStart = U.currentTimeMillis();
-                    else if (U.currentTimeMillis() - noResStart > spi.joinTimeout)
+                    else if (U.currentTimeMillis() - noResStart > spi.getJoinTimeout())
                         throw new IgniteSpiException(
                             "Failed to connect to any address from IP finder within join timeout " +
                                 "(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
@@ -895,15 +889,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         Collection<Throwable> errs = null;
 
-        long ackTimeout0 = spi.ackTimeout;
+        long ackTimeout0 = spi.getAckTimeout();
 
         int connectAttempts = 1;
 
-        boolean joinReqSent = false;
+        boolean joinReqSent;
 
         UUID locNodeId = getLocalNodeId();
 
-        for (int i = 0; i < spi.reconCnt; i++) {
+        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
+        while (true){
             // Need to set to false on each new iteration,
             // since remote node may leave in the middle of the first iteration.
             joinReqSent = false;
@@ -915,14 +911,16 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr);
+                sock = spi.openSocket(addr, timeoutCtrl);
 
                 openSock = true;
 
                 // Handshake.
-                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutCtrl.nextTimeoutChunk(
+                    spi.getSocketTimeout()));
 
-                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
+                    ackTimeout0));
 
                 if (locNodeId.equals(res.creatorNodeId())) {
                     if (log.isDebugEnabled())
@@ -936,7 +934,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Send message.
                 tstamp = U.currentTimeMillis();
 
-                spi.writeToSocket(sock, msg);
+                spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -953,7 +951,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // E.g. due to class not found issue.
                 joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
 
-                return spi.readReceipt(sock, ackTimeout0);
+                return spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
             }
             catch (ClassCastException e) {
                 // This issue is rarely reproducible on AmazonEC2, but never
@@ -979,6 +977,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                    break;
+
                 if (!openSock) {
                     // Reconnect for the second time, if connection is not established.
                     if (connectAttempts < 2) {
@@ -990,7 +991,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     break; // Don't retry if we can not establish connection.
                 }
 
-                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException ||
+                    X.hasCause(e, SocketTimeoutException.class))) {
                     ackTimeout0 *= 2;
 
                     if (!checkAckTimeout(ackTimeout0))
@@ -2029,7 +2031,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
 
                 addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) {
-                    long ackTimeout0 = spi.ackTimeout;
+                    long ackTimeout0 = spi.getAckTimeout();
 
                     if (locNodeAddrs.contains(addr)){
                         if (log.isDebugEnabled())
@@ -2039,8 +2041,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    for (int i = 0; i < spi.reconCnt; i++) {
+                    while (true) {
                         if (sock == null) {
+                            IgniteSpiOperationTimeoutController timeoutCrt =
+                                new IgniteSpiOperationTimeoutController(spi);
+
                             nextNodeExists = false;
 
                             boolean success = false;
@@ -2051,14 +2056,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                sock = spi.openSocket(addr);
+                                sock = spi.openSocket(addr, timeoutCrt);
 
                                 openSock = true;
 
                                 // Handshake.
-                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
+                                    timeoutCrt.nextTimeoutChunk(spi.getNetworkTimeout()));
 
-                                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+                                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
+                                    timeoutCrt.nextTimeoutChunk(ackTimeout0));
 
                                 if (locNodeId.equals(res.creatorNodeId())) {
                                     if (log.isDebugEnabled())
@@ -2142,8 +2149,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (!openSock)
                                     break; // Don't retry if we can not establish connection.
 
-                                if (e instanceof SocketTimeoutException ||
-                                    X.hasCause(e, SocketTimeoutException.class)) {
+                                if (timeoutCrt.checkFailureDetectionThresholdReached(e))
+                                    break;
+                                else if (!spi.failureDetectionThresholdEnabled() && (e instanceof
+                                    SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
                                     ackTimeout0 *= 2;
 
                                     if (!checkAckTimeout(ackTimeout0))
@@ -2173,6 +2182,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
 
+                            IgniteSpiOperationTimeoutController timeoutCtrl;
+
                             if (failure || forceSndPending) {
                                 if (log.isDebugEnabled())
                                     log.debug("Pending messages will be sent [failure=" + failure +
@@ -2185,6 +2196,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 boolean skip = pendingMsgs.discardId != null;
 
                                 for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+                                    timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
                                     if (skip) {
                                         if (pendingMsg.id().equals(pendingMsgs.discardId))
                                             skip = false;
@@ -2198,7 +2211,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         pendingMsgs.discardId);
 
                                     try {
-                                        writeToSocket(sock, pendingMsg);
+                                        writeToSocket(sock, pendingMsg, timeoutCtrl.nextTimeoutChunk(
+                                            spi.getSocketTimeout()));
                                     }
                                     finally {
                                         clearNodeAddedMessage(pendingMsg);
@@ -2206,7 +2220,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
 
-                                    int res = spi.readReceipt(sock, ackTimeout0);
+                                    int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
 
                                     if (log.isDebugEnabled())
                                         log.debug("Pending message has been sent to next node [msg=" + msg.id() +
@@ -2222,6 +2236,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
 
+                            timeoutCtrl
                             try {
                                 long tstamp = U.currentTimeMillis();
 
@@ -4903,14 +4918,15 @@ class ServerImpl extends TcpDiscoveryImpl {
         /**
          * @param sock Socket.
          * @param msg Message.
+         * @param timeout Socket timeout.
          * @throws IOException If IO failed.
          * @throws IgniteCheckedException If marshalling failed.
          */
-        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
             throws IOException, IgniteCheckedException {
             bout.reset();
 
-            spi.writeToSocket(sock, msg, bout);
+            spi.writeToSocket(sock, msg, bout, timeout);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index ace917f..c4278ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -266,10 +266,10 @@ abstract class TcpDiscoveryImpl {
      * maximum acknowledgement timeout, {@code false} otherwise.
      */
     protected boolean checkAckTimeout(long ackTimeout) {
-        if (ackTimeout > spi.maxAckTimeout) {
+        if (ackTimeout > spi.getMaxAckTimeout()) {
             LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
                 "(consider increasing 'maxAckTimeout' configuration property) " +
-                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
+                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']');
 
             return false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index e5d5cd6..126bf03 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -676,7 +676,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /**
      * Sets IP finder for IP addresses sharing and storing.
      * <p>
-     * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
+     * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will
+     * be used by default.
      *
      * @param ipFinder IP finder.
      */
@@ -1103,12 +1104,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /**
      * @param sockAddr Remote address.
-     * @param timeout Socket opening timeout.
+     * @param timeoutCtrl Timeout controller.
      * @return Opened socket.
      * @throws IOException If failed.
      */
-    protected Socket openSocket(InetSocketAddress sockAddr, long timeout) throws IOException,
-        NetOperationTimeoutException {
+    protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutController timeoutCtrl)
+        throws IOException, IgniteSpiOperationTimeoutException {
         assert sockAddr != null;
 
         InetSocketAddress resolved = sockAddr.isUnresolved() ?
@@ -1126,11 +1127,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         long startTs = U.currentTimeMillis();
 
-        sock.connect(resolved, (int)timeout);
+        sock.connect(resolved, (int)timeoutCtrl.nextTimeoutChunk(sockTimeout));
 
-        timeout = nextNetOperationTimeout(timeout, startTs, sockTimeout);
-
-        writeToSocket(sock, U.IGNITE_HEADER, timeout);
+        writeToSocket(sock, U.IGNITE_HEADER, timeoutCtrl.nextTimeoutChunk(sockTimeout));
 
         return sock;
     }
@@ -1148,7 +1147,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         assert sock != null;
         assert data != null;
 
-        //SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);


[2/3] incubator-ignite git commit: ignite-752: reimplemented pingNode with failure detection threshold

Posted by sb...@apache.org.
ignite-752: reimplemented pingNode with failure detection threshold


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/567aec10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/567aec10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/567aec10

Branch: refs/heads/ignite-752
Commit: 567aec1029a6302016e3fe189f5a973014b54d94
Parents: 1a7421e
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 15 12:29:49 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 15 12:29:49 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  25 ++++-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java | 103 ++++++++++++++++++
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  30 +++++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 108 +++++++++++--------
 4 files changed, 217 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/567aec10/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2d36c7a..b3d2bfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -189,6 +189,9 @@ public class IgniteConfiguration {
     /** Default value for cache sanity check enabled flag. */
     public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
 
+    /** Default failure detection threshold used by DiscoverySpi and CommunicationSpi in millis. */
+    public static final int DFLT_FAILURE_DETECTION_THRESHOLD = 10_000;
+
     /** Optional grid name. */
     private String gridName;
 
@@ -366,6 +369,9 @@ public class IgniteConfiguration {
     /** Port number range for time server. */
     private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE;
 
+    /** Failure detection threshold used by DiscoverySpi and CommunicationSpi. */
+    private int failureDetectionThreshold = DFLT_FAILURE_DETECTION_THRESHOLD;
+
     /** Property names to include into node attributes. */
     private String[] includeProps;
 
@@ -444,7 +450,7 @@ public class IgniteConfiguration {
         clockSyncSamples = cfg.getClockSyncSamples();
         deployMode = cfg.getDeploymentMode();
         discoStartupDelay = cfg.getDiscoveryStartupDelay();
-        pubPoolSize = cfg.getPublicThreadPoolSize();
+        failureDetectionThreshold = cfg.getFailureDetectionThreshold();
         ggHome = cfg.getIgniteHome();
         ggWork = cfg.getWorkDirectory();
         gridName = cfg.getGridName();
@@ -474,6 +480,7 @@ public class IgniteConfiguration {
         p2pMissedCacheSize = cfg.getPeerClassLoadingMissedResourcesCacheSize();
         p2pPoolSize = cfg.getPeerClassLoadingThreadPoolSize();
         pluginCfgs = cfg.getPluginConfigurations();
+        pubPoolSize = cfg.getPublicThreadPoolSize();
         segChkFreq = cfg.getSegmentCheckFrequency();
         segPlc = cfg.getSegmentationPolicy();
         segResolveAttempts = cfg.getSegmentationResolveAttempts();
@@ -1629,6 +1636,22 @@ public class IgniteConfiguration {
     }
 
     /**
+     * TODO: IGNITE-752
+     * @return
+     */
+    public int getFailureDetectionThreshold() {
+        return failureDetectionThreshold;
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @param failureDetectionThreshold
+     */
+    public void setFailureDetectionThreshold(int failureDetectionThreshold) {
+        this.failureDetectionThreshold = failureDetectionThreshold;
+    }
+
+    /**
      * Should return fully configured load balancing SPI implementation. If not provided,
      * {@link RoundRobinLoadBalancingSpi} will be used.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/567aec10/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 5e557bd..82ed3d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
@@ -35,6 +36,7 @@ import org.jetbrains.annotations.*;
 
 import javax.management.*;
 import java.io.*;
+import java.net.*;
 import java.text.*;
 import java.util.*;
 
@@ -73,6 +75,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     /** Discovery listener. */
     private GridLocalEventListener paramsLsnr;
 
+    /** Failure detection threshold will not be used usage switch. */
+    private boolean failureDetectionThresholdEnabled = true;
+
+    /**
+     *  Failure detection threshold. Initialized with the value of
+     *  {@link IgniteConfiguration#getFailureDetectionThreshold()}.
+     */
+    private long failureDetectionThreshold;
+
     /**
      * Creates new adapter and initializes it from the current (this) class.
      * SPI name will be initialized to the simple name of the class
@@ -194,6 +205,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
     }
 
+    /** {@inheritDoc} */
+    @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+        if (!failureDetectionThresholdEnabled) {
+            failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
+
+            assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
+        }
+    }
+
     /**
      * Inject ignite instance.
      */
@@ -560,6 +580,89 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     }
 
     /**
+     * TODO: IGNITE-752
+     * @param dfltTimeout
+     * @return
+     */
+    public long firstNetOperationTimeout(long dfltTimeout) {
+        return !failureDetectionThresholdEnabled ? dfltTimeout : failureDetectionThreshold;
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @param curTimeout
+     * @param lastOperStartTime
+     * @param dfltTimeout
+     * @return
+     * @throws IOException
+     */
+    public long nextNetOperationTimeout(long curTimeout, long lastOperStartTime, long dfltTimeout)
+        throws NetOperationTimeoutException {
+        if (!failureDetectionThresholdEnabled)
+            return dfltTimeout;
+
+        long timeLeft = curTimeout - lastOperStartTime;
+
+        if (timeLeft <= 0)
+            throw new NetOperationTimeoutException("Network operation timed out. Increase failure detection threshold" +
+                " using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts manually." +
+                " Current failure detection threshold: " + failureDetectionThreshold);
+
+        return timeLeft;
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @param e
+     * @return
+     */
+    public boolean checkFailureDetectionThresholdReached(Exception e) {
+        if (!failureDetectionThresholdEnabled)
+            return false;
+
+        return e instanceof NetOperationTimeoutException || e instanceof SocketTimeoutException ||
+            X.hasCause(e, NetOperationTimeoutException.class, SocketException.class);
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @param enabled
+     */
+    public void failureDetectionThresholdEnabled(boolean enabled) {
+        failureDetectionThresholdEnabled = enabled;
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @return
+     */
+    public boolean failureDetectionThresholdEnabled() {
+        return failureDetectionThresholdEnabled;
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @return
+     */
+    public long failureDetectionThreshold() {
+        return failureDetectionThreshold;
+    }
+
+
+    /**
+     * TODO: IGNITE-752
+     */
+    public static class NetOperationTimeoutException extends IgniteCheckedException {
+        /**
+         * Constructor.
+         * @param msg Error message.
+         */
+        public NetOperationTimeoutException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
      * Temporarily SPI context.
      */
     private class GridDummySpiContext implements IgniteSpiContext {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/567aec10/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d51293e..9dd565c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -50,6 +50,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 import static org.apache.ignite.spi.IgnitePortProtocol.*;
 import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
 import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
+import static org.apache.ignite.spi.IgniteSpiAdapter.NetOperationTimeoutException;
 
 /**
  *
@@ -508,18 +509,32 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 Socket sock = null;
 
-                for (int i = 0; i < spi.reconCnt; i++) {
+                long timeout = 0;
+                long lastOperStartTs = 0;
+
+                int reconCnt = 0;
+
+                while (true) {
                     try {
                         if (addr.isUnresolved())
                             addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
 
-                        long tstamp = U.currentTimeMillis();
+                        timeout = lastOperStartTs == 0 ? spi.firstNetOperationTimeout(spi.getSocketTimeout()) :
+                            spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout());
+
+                        long tstamp = lastOperStartTs = U.currentTimeMillis();
+
+                        sock = spi.openSocket(addr, timeout);
 
-                        sock = spi.openSocket(addr);
+                        timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout());
+                        lastOperStartTs = U.currentTimeMillis();
 
-                        spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+                        spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), timeout);
 
-                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout);
+                        timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getNetworkTimeout());
+                        lastOperStartTs = U.currentTimeMillis();
+
+                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeout);
 
                         if (locNodeId.equals(res.creatorNodeId())) {
                             if (log.isDebugEnabled())
@@ -541,6 +556,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                             errs = new ArrayList<>();
 
                         errs.add(e);
+
+                        if (spi.checkFailureDetectionThresholdReached(e))
+                            break;
+                        else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+                            break;
                     }
                     finally {
                         U.closeQuiet(sock);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/567aec10/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 4240d6a..e5d5cd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -327,9 +327,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** */
     private boolean forceSrvMode;
 
-    /** User manually set one of the failure detection timeouts. Failure detection threshold will not be used. */
-    private boolean manualFailureDetectionSetup;
-
     /** {@inheritDoc} */
     @Override public String getSpiState() {
         return impl.getSpiState();
@@ -502,7 +499,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setReconnectCount(int reconCnt) {
         this.reconCnt = reconCnt;
-        manualFailureDetectionSetup = true;
+
+        failureDetectionThresholdEnabled(false);
 
         return this;
     }
@@ -529,7 +527,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) {
         this.maxAckTimeout = maxAckTimeout;
-        manualFailureDetectionSetup = true;
+
+        failureDetectionThresholdEnabled(false);
 
         return this;
     }
@@ -702,7 +701,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setSocketTimeout(long sockTimeout) {
         this.sockTimeout = sockTimeout;
-        manualFailureDetectionSetup = true;
+
+        failureDetectionThresholdEnabled(false);
 
         return this;
     }
@@ -720,7 +720,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setAckTimeout(long ackTimeout) {
         this.ackTimeout = ackTimeout;
-        manualFailureDetectionSetup = true;
+
+        failureDetectionThresholdEnabled(false);
 
         return this;
     }
@@ -735,7 +736,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @IgniteSpiConfiguration(optional = true)
     public TcpDiscoverySpi setNetworkTimeout(long netTimeout) {
         this.netTimeout = netTimeout;
-        manualFailureDetectionSetup = true;
+
+        failureDetectionThresholdEnabled(false);
 
         return this;
     }
@@ -1101,10 +1103,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /**
      * @param sockAddr Remote address.
+     * @param timeout Socket opening timeout.
      * @return Opened socket.
      * @throws IOException If failed.
      */
-    protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+    protected Socket openSocket(InetSocketAddress sockAddr, long timeout) throws IOException,
+        NetOperationTimeoutException {
         assert sockAddr != null;
 
         InetSocketAddress resolved = sockAddr.isUnresolved() ?
@@ -1120,9 +1124,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         sock.setTcpNoDelay(true);
 
-        sock.connect(resolved, (int)sockTimeout);
+        long startTs = U.currentTimeMillis();
+
+        sock.connect(resolved, (int)timeout);
+
+        timeout = nextNetOperationTimeout(timeout, startTs, sockTimeout);
 
-        writeToSocket(sock, U.IGNITE_HEADER);
+        writeToSocket(sock, U.IGNITE_HEADER, timeout);
 
         return sock;
     }
@@ -1132,14 +1140,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      *
      * @param sock Socket.
      * @param data Raw data to write.
+     * @param timeout Socket write timeout.
      * @throws IOException If IO failed or write timed out.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(Socket sock, byte[] data) throws IOException {
+    private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException {
         assert sock != null;
         assert data != null;
 
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+        //SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
@@ -1175,11 +1185,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      *
      * @param sock Socket.
      * @param msg Message.
+     * @param timeout Socket write timeout.
      * @throws IOException If IO failed or write timed out.
      * @throws IgniteCheckedException If marshalling failed.
      */
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException {
-        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
+        throws IOException, IgniteCheckedException {
+        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K.
     }
 
     /**
@@ -1192,8 +1204,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @throws IgniteCheckedException If marshalling failed.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
-        throws IOException, IgniteCheckedException {
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout,
+        long timeout) throws IOException, IgniteCheckedException {
         assert sock != null;
         assert msg != null;
         assert bout != null;
@@ -1201,7 +1213,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         // Marshall message first to perform only write after.
         marsh.marshal(msg, bout);
 
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
@@ -1548,39 +1560,43 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
-        if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
-            if (ackTimeout == 0)
-                ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
+        super.spiStart(gridName);
 
-            if (sockTimeout == 0)
-                sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT;
+        if (!failureDetectionThresholdEnabled()) {
+            if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
+                if (ackTimeout == 0)
+                    ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
 
-            impl = new ClientImpl(this);
+                if (sockTimeout == 0)
+                    sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT;
 
-            ctxInitLatch.countDown();
-        }
-        else {
-            if (ackTimeout == 0)
-                ackTimeout = DFLT_ACK_TIMEOUT;
+                impl = new ClientImpl(this);
+
+                ctxInitLatch.countDown();
+            } else {
+                if (ackTimeout == 0)
+                    ackTimeout = DFLT_ACK_TIMEOUT;
 
-            if (sockTimeout == 0)
-                sockTimeout = DFLT_SOCK_TIMEOUT;
+                if (sockTimeout == 0)
+                    sockTimeout = DFLT_SOCK_TIMEOUT;
 
-            impl = new ServerImpl(this);
+                impl = new ServerImpl(this);
+            }
+
+            assertParameter(netTimeout > 0, "networkTimeout > 0");
+            assertParameter(sockTimeout > 0, "sockTimeout > 0");
+            assertParameter(ackTimeout > 0, "ackTimeout > 0");
+            assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
+            assertParameter(reconCnt > 0, "reconnectCnt > 0");
         }
 
         assertParameter(ipFinder != null, "ipFinder != null");
         assertParameter(hbFreq > 0, "heartbeatFreq > 0");
-        assertParameter(netTimeout > 0, "networkTimeout > 0");
-        assertParameter(sockTimeout > 0, "sockTimeout > 0");
-        assertParameter(ackTimeout > 0, "ackTimeout > 0");
 
         assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
         assertParameter(locPort > 1023, "localPort > 1023");
         assertParameter(locPortRange >= 0, "localPortRange >= 0");
         assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff");
-        assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
-        assertParameter(reconCnt > 0, "reconnectCnt > 0");
         assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
         assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0");
         assertParameter(threadPri > 0, "threadPri > 0");
@@ -1598,11 +1614,17 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             log.debug(configInfo("localPort", locPort));
             log.debug(configInfo("localPortRange", locPortRange));
             log.debug(configInfo("threadPri", threadPri));
-            log.debug(configInfo("networkTimeout", netTimeout));
-            log.debug(configInfo("sockTimeout", sockTimeout));
-            log.debug(configInfo("ackTimeout", ackTimeout));
-            log.debug(configInfo("maxAckTimeout", maxAckTimeout));
-            log.debug(configInfo("reconnectCount", reconCnt));
+
+            if (!failureDetectionThresholdEnabled()) {
+                log.debug(configInfo("networkTimeout", netTimeout));
+                log.debug(configInfo("sockTimeout", sockTimeout));
+                log.debug(configInfo("ackTimeout", ackTimeout));
+                log.debug(configInfo("maxAckTimeout", maxAckTimeout));
+                log.debug(configInfo("reconnectCount", reconCnt));
+            }
+            else
+                log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold()));
+
             log.debug(configInfo("ipFinder", ipFinder));
             log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
             log.debug(configInfo("heartbeatFreq", hbFreq));
@@ -1611,7 +1633,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         }
 
         // Warn on odd network timeout.
-        if (netTimeout < 3000)
+        if (!failureDetectionThresholdEnabled() && netTimeout < 3000)
             U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
 
         registerMBean(gridName, this, TcpDiscoverySpiMBean.class);