You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 15:22:59 UTC

[3/4] incubator-ignite git commit: ignite-752: failure detection threshold support in TcpCommunicationSpi

ignite-752: failure detection threshold support in TcpCommunicationSpi


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0949c93f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0949c93f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0949c93f

Branch: refs/heads/ignite-752
Commit: 0949c93fe1d1dd435b906fb1d79263e9073c0f1b
Parents: c3c0ef8
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 13:25:48 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 13:25:48 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  20 ++--
 .../IgniteSpiOperationTimeoutController.java    |   7 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 106 +++++++++++++++----
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 4 files changed, 102 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 422ce81..500c461 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -203,15 +203,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
     }
 
-    /** {@inheritDoc} */
-    @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
-        if (!failureDetectionThresholdEnabled) {
-            failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
-
-            assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
-        }
-    }
-
     /**
      * Inject ignite instance.
      */
@@ -579,6 +570,17 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
     /**
      * TODO: IGNITE-752
+     */
+    protected void initFailureDetectionThreshold() {
+        if (failureDetectionThresholdEnabled) {
+            failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
+
+            assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
+        }
+    }
+
+    /**
+     * TODO: IGNITE-752
      * @param enabled
      */
     public void failureDetectionThresholdEnabled(boolean enabled) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
index 3ae4fa4..ba95871 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
@@ -46,6 +46,9 @@ public class IgniteSpiOperationTimeoutController {
     public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) {
         failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
         failureDetectionThreshold = adapter.failureDetectionThreshold();
+
+        assert !failureDetectionThresholdEnabled || failureDetectionThreshold > 0 : " [failureDetectionThreshold=" +
+            failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled + ']';
     }
 
     /**
@@ -70,8 +73,8 @@ public class IgniteSpiOperationTimeoutController {
             lastOperStartTs = curTs;
 
             if (timeout <= 0)
-                throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase failure detection" +
-                    " threshold using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts" +
+                throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+                    "'failureDetectionThreshold' configuration property or set SPI specific timeouts" +
                     " manually. Current failure detection threshold: " + failureDetectionThreshold);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ca2995..b2bc9eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -970,6 +970,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @IgniteSpiConfiguration(optional = true)
     public void setConnectTimeout(long connTimeout) {
         this.connTimeout = connTimeout;
+
+        failureDetectionThresholdEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -992,6 +994,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @IgniteSpiConfiguration(optional = true)
     public void setMaxConnectTimeout(long maxConnTimeout) {
         this.maxConnTimeout = maxConnTimeout;
+
+        failureDetectionThresholdEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1010,6 +1014,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @IgniteSpiConfiguration(optional = true)
     public void setReconnectCount(int reconCnt) {
         this.reconCnt = reconCnt;
+
+        failureDetectionThresholdEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1239,6 +1245,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
         nodeIdMsg = new NodeIdMessage(getLocalNodeId());
 
+        initFailureDetectionThreshold();
+
         assertParameter(locPort > 1023, "locPort > 1023");
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1247,10 +1255,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
         assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
-        assertParameter(reconCnt > 0, "reconnectCnt > 0");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
-        assertParameter(connTimeout >= 0, "connTimeout >= 0");
-        assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+
+        if (!failureDetectionThresholdEnabled()) {
+            assertParameter(reconCnt > 0, "reconnectCnt > 0");
+            assertParameter(connTimeout >= 0, "connTimeout >= 0");
+            assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+        }
+
         assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
         assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
         assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
@@ -1260,7 +1272,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
 
             assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
-                "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
+                            "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
         }
 
         try {
@@ -1326,9 +1338,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
             log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
-            log.debug(configInfo("connTimeout", connTimeout));
-            log.debug(configInfo("maxConnTimeout", maxConnTimeout));
-            log.debug(configInfo("reconCnt", reconCnt));
+
+            if (failureDetectionThresholdEnabled()) {
+                log.debug(configInfo("connTimeout", connTimeout));
+                log.debug(configInfo("maxConnTimeout", maxConnTimeout));
+                log.debug(configInfo("reconCnt", reconCnt));
+            }
+            else
+                log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold()));
+
             log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
             log.debug(configInfo("ackSndThreshold", ackSndThreshold));
             log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
@@ -1850,17 +1868,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         long connTimeout0 = connTimeout;
 
+        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+
         while (true) {
             GridCommunicationClient client;
 
             try {
                 client = new GridShmemCommunicationClient(metricsLsnr,
                     port,
-                    connTimeout,
+                    timeoutCtrl.nextTimeoutChunk(connTimeout),
                     log,
                     getSpiContext().messageFormatter());
             }
             catch (IgniteCheckedException e) {
+                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                    throw e;
+
                 // Reconnect for the second time, if connection is not established.
                 if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
                     connectAttempts++;
@@ -1872,15 +1895,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), connTimeout0);
+                safeHandshake(client, null, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
             }
-            catch (HandshakeTimeoutException e) {
+            catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+                client.forceClose();
+
+                if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
+                    timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+                    log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" +
+                        failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']');
+
+                    throw e;
+                }
+
+                assert !failureDetectionThresholdEnabled();
+
                 if (log.isDebugEnabled())
-                    log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                    log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                         ", err=" + e.getMessage() + ", client=" + client + ']');
 
-                client.forceClose();
-
                 if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
                     if (log.isDebugEnabled())
                         log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
@@ -1889,8 +1922,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             ", err=" + e.getMessage() + ", client=" + client + ']');
 
                     throw e;
-                }
-                else {
+                } else {
                     attempt++;
 
                     connTimeout0 *= 2;
@@ -1994,6 +2026,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             int attempt = 1;
 
+            IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+
             while (!conn) { // Reconnection on handshake timeout.
                 try {
                     SocketChannel ch = SocketChannel.open();
@@ -2020,9 +2054,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     long rcvCnt = -1;
 
                     try {
-                        ch.socket().connect(addr, (int)connTimeout);
+                        ch.socket().connect(addr, (int)timeoutCtrl.nextTimeoutChunk(connTimeout));
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
 
                         if (rcvCnt == -1)
                             return null;
@@ -2056,14 +2090,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                     }
                 }
-                catch (HandshakeTimeoutException e) {
+                catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                     if (client != null) {
                         client.forceClose();
 
                         client = null;
                     }
 
-                    onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                    if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
+                        timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+
+                        String msg = "Handshake timed out (failure detection threshold is reached) " +
+                            "[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']';
+
+                        onException(msg, e);
+
+                        if (log.isDebugEnabled())
+                            log.debug(msg);
+
+                        if (errs == null)
+                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                                "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
+                                "in order to prevent parties from waiting forever in case of network issues " +
+                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+
+                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+
+                        break;
+                    }
+
+                    assert !failureDetectionThresholdEnabled();
+
+                    onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                         ", addr=" + addr + ']', e);
 
                     if (log.isDebugEnabled())
@@ -2108,7 +2166,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
 
-                    if (X.hasCause(e, SocketTimeoutException.class))
+                    boolean failureDetThrReached = timeoutCtrl.checkFailureDetectionThresholdReached(e);
+
+                    if (failureDetThrReached)
+                        LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " +
+                            "configuration property) [addr=" + addr + ", failureDetectionThreshold=" +
+                            failureDetectionThreshold() + ']');
+                    else if (X.hasCause(e, SocketTimeoutException.class))
                         LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
                             "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
 
@@ -2121,7 +2185,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
 
                     // Reconnect for the second time, if connection is not established.
-                    if (connectAttempts < 2 &&
+                    if (!failureDetThrReached && connectAttempts < 2 &&
                         (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
                         connectAttempts++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 5ed946b..5be7ab9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1560,7 +1560,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
-        super.spiStart(gridName);
+        initFailureDetectionThreshold();
 
         if (!failureDetectionThresholdEnabled()) {
             if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {