You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/09/23 18:36:38 UTC

[ignite] branch master updated: Revert "IGNITE-13134 : checking if failureDetectionTimeout enabled. - Fixes #7916."

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e1bc2  Revert "IGNITE-13134 : checking if failureDetectionTimeout enabled. - Fixes #7916."
16e1bc2 is described below

commit 16e1bc25216e00834ffc38dc0a0444097ae651a1
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Wed Sep 23 21:26:36 2020 +0300

    Revert "IGNITE-13134 : checking if failureDetectionTimeout enabled. - Fixes #7916."
    
    This reverts commit 1ca486eb - Fixes #8265.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../spi/IgniteSpiOperationTimeoutHelper.java       | 86 ++++++++++++----------
 .../tcp/internal/ConnectionClientPool.java         |  2 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java       |  2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 79 ++++++--------------
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  | 11 ++-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java    |  2 +-
 6 files changed, 79 insertions(+), 103 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index 96fea9e..48161ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -30,11 +30,20 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  *
  */
 public class IgniteSpiOperationTimeoutHelper {
-    /** Flag whether to use timeout. */
-    private final boolean timeoutEnabled;
+    // https://issues.apache.org/jira/browse/IGNITE-11221
+    // We need to reuse new logic ExponentialBackoffTimeout logic in TcpDiscovery instead of this class.
 
-    /** Time in nanos which cannot be reached for current operation. */
-    private final long timeoutThreshold;
+    /** */
+    private long lastOperStartNanos;
+
+    /** */
+    private long timeout;
+
+    /** */
+    private final boolean failureDetectionTimeoutEnabled;
+
+    /** */
+    private final long failureDetectionTimeout;
 
     /**
      * Constructor.
@@ -43,7 +52,9 @@ public class IgniteSpiOperationTimeoutHelper {
      * @param srvOp {@code True} if communicates with server node.
      */
     public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) {
-        this(adapter, srvOp, -1, -1);
+        failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled();
+        failureDetectionTimeout = srvOp ? adapter.failureDetectionTimeout() :
+            adapter.clientFailureDetectionTimeout();
     }
 
     /**
@@ -51,26 +62,15 @@ public class IgniteSpiOperationTimeoutHelper {
      *
      * @param adapter SPI adapter.
      * @param srvOp {@code True} if communicates with server node.
-     * @param lastRelatedOperationTime Time of last related operation in nanos. Ignored if negative, 0 or
-     * {@code adapter.failureDetectionTimeoutEnabled()} is false.
-     * @param absoluteThreshold Absolute time threshold (nanos) which must not be reached. Ignored if negative or 0.
+     * @param lastOperStartNanos Time of last related operation in nanos.
      */
-    public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastRelatedOperationTime,
-        long absoluteThreshold) {
-        timeoutEnabled = adapter.failureDetectionTimeoutEnabled();
+    public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastOperStartNanos) {
+        this(adapter, srvOp);
 
-        if (timeoutEnabled) {
-            long timeout = (lastRelatedOperationTime > 0 ? lastRelatedOperationTime : System.nanoTime()) +
-                U.millisToNanos(srvOp ? adapter.failureDetectionTimeout() : adapter.clientFailureDetectionTimeout());
+        this.lastOperStartNanos = lastOperStartNanos;
 
-            if (absoluteThreshold > 0 && timeout > absoluteThreshold)
-                timeout = absoluteThreshold;
-
-            timeoutThreshold = timeout;
-        } else {
-            // Save absolute threshold if it is set.
-            timeoutThreshold = absoluteThreshold > 0 ? absoluteThreshold : 0;
-        }
+        if (lastOperStartNanos > 0)
+            timeout = failureDetectionTimeout;
     }
 
     /**
@@ -85,32 +85,42 @@ public class IgniteSpiOperationTimeoutHelper {
      * this {@code IgniteSpiOperationTimeoutController}.
      */
     public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
-        long now = System.nanoTime();
-
-        long left;
+        if (!failureDetectionTimeoutEnabled)
+            return dfltTimeout;
 
-        if (timeoutEnabled)
-            left = timeoutThreshold - now;
+        if (lastOperStartNanos == 0) {
+            timeout = failureDetectionTimeout;
+            lastOperStartNanos = System.nanoTime();
+        }
         else {
-            left = U.millisToNanos(dfltTimeout);
+            long curNanos = System.nanoTime();
 
-            if (timeoutThreshold > 0 && now + left >= timeoutThreshold)
-                left = timeoutThreshold - now;
-        }
+            timeout -= U.nanosToMillis(curNanos - lastOperStartNanos);
 
-        if (left <= 0)
-            throw new IgniteSpiOperationTimeoutException("Network operation timed out.");
+            lastOperStartNanos = curNanos;
 
-        return U.nanosToMillis(left);
+            if (timeout <= 0)
+                throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+                    "'failureDetectionTimeout' configuration property [failureDetectionTimeout="
+                    + failureDetectionTimeout + ']');
+        }
+        
+        return timeout;
     }
 
     /**
-     * Checks whether the given {@link Exception} is a timeout.
+     * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached.
      *
-     * @param e Exception to check.
-     * @return {@code True} if given exception is a timeout. {@code False} otherwise.
+     * @param e Exception.
+     * @return {@code true} if failure detection timeout is reached, {@code false} otherwise.
      */
     public boolean checkFailureTimeoutReached(Exception e) {
-        return X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class);
+        if (!failureDetectionTimeoutEnabled)
+            return false;
+
+        if (X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class))
+            return true;
+
+        return (timeout - U.millisSinceNanos(lastOperStartNanos) <= 0);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index 14a0219..9476e44 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -521,7 +521,7 @@ public class ConnectionClientPool {
                     msgFormatterSupplier.get());
             }
             catch (IgniteCheckedException e) {
-                if (connectAttempts == 1 && timeoutHelper.checkFailureTimeoutReached(e))
+                if (timeoutHelper.checkFailureTimeoutReached(e))
                     throw e;
 
                 // Reconnect for the second time, if connection is not established.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 1df5abb..5017e18 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -841,7 +841,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     break;
                 }
 
-                if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+                if (timeoutHelper.checkFailureTimeoutReached(e))
                     break;
 
                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b85d672c..9d5e3ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -922,7 +922,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (!openedSock && reconCnt == 2)
                             break;
 
-                        if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+                        if (timeoutHelper.checkFailureTimeoutReached(e))
                             break;
                         else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount())
                             break;
@@ -1576,7 +1576,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     break;
                 }
 
-                if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+                if (timeoutHelper.checkFailureTimeoutReached(e))
                     break;
 
                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
@@ -3477,12 +3477,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     while (true) {
                         if (sock == null) {
-                            // We re-create the helper here because it could be created earlier with wrong timeout on
-                            // message sending like IgniteConfiguration.failureDetectionTimeout. Here we are in the
-                            // state of conenction recovering and have to work with
-                            // TcpDiscoverSpi.getEffectiveConnectionRecoveryTimeout()
-                            if (timeoutHelper == null || sndState != null)
-                                timeoutHelper = serverOperationTimeoutHelper(sndState, -1);
+                            if (timeoutHelper == null)
+                                timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
 
                             boolean success = false;
 
@@ -3521,8 +3517,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 // We should take previousNodeAlive flag into account only if we received the response from the correct node.
                                 if (res.creatorNodeId().equals(next.id()) && res.previousNodeAlive() && sndState != null) {
-                                    sndState.checkTimeout();
-
                                     // Remote node checked connection to it's previous and got success.
                                     boolean previousNode = sndState.markLastFailedNodeAlive();
 
@@ -3630,20 +3624,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e);
 
-                                // Fastens failure detection.
-                                if (sndState != null && sndState.checkTimeout()) {
-                                    segmentLocalNodeOnSendFail(failedNodes);
-
-                                    return; // Nothing to do here.
-                                }
-
                                 if (!openSock)
                                     break; // Don't retry if we can not establish connection.
 
                                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
                                     break;
 
-                                if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+                                if (timeoutHelper.checkFailureTimeoutReached(e))
                                     break;
                                 else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
                                     SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
@@ -3702,8 +3689,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     addFailedNodes(pendingMsg, failedNodes);
 
-                                    if (timeoutHelper == null)
-                                        timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
+                                    if (timeoutHelper == null) {
+                                        timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true,
+                                            lastRingMsgSentTime);
+                                    }
 
                                     try {
                                         spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
@@ -3747,7 +3736,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 long tsNanos = System.nanoTime();
 
                                 if (timeoutHelper == null)
-                                    timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
+                                    timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime);
 
                                 addFailedNodes(msg, failedNodes);
 
@@ -3814,7 +3803,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
                                 e);
 
-                            if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+                            if (timeoutHelper.checkFailureTimeoutReached(e))
                                 break;
 
                             if (!spi.failureDetectionTimeoutEnabled()) {
@@ -3851,11 +3840,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (!sent) {
                     if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0)
                         sndState = new CrossRingMessageSendState();
-                    else if (sndState != null && sndState.checkTimeout()) {
-                        segmentLocalNodeOnSendFail(failedNodes);
-
-                        return; // Nothing to do here.
-                    }
 
                     boolean failedNextNode = sndState == null || sndState.markNextNodeFailed();
 
@@ -3889,6 +3873,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                     }
 
+                    if (sndState != null && sndState.isFailed()) {
+                        segmentLocalNodeOnSendFail(failedNodes);
+
+                        return; // Nothing to do here.
+                    }
+
                     next = null;
 
                     errs = null;
@@ -6511,19 +6501,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
-    /**
-     * Creates proper timeout helper taking in account current send state.
-     *
-     * @param sndState Current connection recovering state. Ignored if {@code null}.
-     * @param lastOperationNanos Time of last related operation. Ignored if negative or 0.
-     * @return Timeout helper.
-     */
-    private IgniteSpiOperationTimeoutHelper serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState,
-        long lastOperationNanos) {
-        return new IgniteSpiOperationTimeoutHelper(spi, true, lastOperationNanos,
-            sndState == null ? -1 : sndState.failTimeNanos);
-    }
-
     /** Fixates time of last sent message. */
     private void updateLastSentMessageTime() {
         lastRingMsgSentTime = System.nanoTime();
@@ -8220,22 +8197,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Checks if message sending has completely failed due to the timeout. Sets {@code RingMessageSendState#FAILED}
-         * if the timeout is reached.
-         *
-         * @return {@code True} if passed timeout is reached. {@code False} otherwise.
-         */
-        boolean checkTimeout() {
-            if (System.nanoTime() >= failTimeNanos) {
-                state = RingMessageSendState.FAILED;
-
-                return true;
-            }
-
-            return false;
-        }
-
-        /**
          * Marks last failed node as alive.
          *
          * @return {@code False} if all failed nodes marked as alive or incorrect state.
@@ -8247,6 +8208,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (--failedNodes <= 0) {
                     failedNodes = 0;
 
+                    if (System.nanoTime() - failTimeNanos >= 0) {
+                        state = RingMessageSendState.FAILED;
+
+                        return false;
+                    }
+
                     state = RingMessageSendState.STARTING_POINT;
                 }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 9bdb886..f1e8406 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2530,12 +2530,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
 
                 LT.warn(log, "Socket write has timed out (consider increasing " +
                     (failureDetectionTimeoutEnabled() ?
-                        "'IgniteConfiguration.failureDetectionTimeout' and 'connRecoveryTimeout' configuration " +
-                            "properties) [failureDetectionTimeout=" + failureDetectionTimeout() :
-                        "'sockTimeout' and 'connRecoveryTimeout' configuration properties) [sockTimeout="
-                            + sockTimeout) + ", connRecoveryTimeout=" + getConnectionRecoveryTimeout() + ", rmtAddr="
-                    + sock.getRemoteSocketAddress() + ", rmtPort=" + sock.getPort() + ", sockTimeout=" + sockTimeout
-                    + ']');
+                            "'IgniteConfiguration.failureDetectionTimeout' configuration property) [" +
+                                    "failureDetectionTimeout=" + failureDetectionTimeout() :
+                            "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout) +
+                        ", rmtAddr=" + sock.getRemoteSocketAddress() + ", rmtPort=" + sock.getPort() +
+                        ", sockTimeout=" + sockTimeout + ']');
             }
         }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index fcf510f..16fd751 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -1976,7 +1976,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
             ignite1.configuration().getDiscoverySpi().failNode(coordId, null);
 
-            assertTrue(failedLatch.await(ignite1.configuration().getFailureDetectionTimeout(), MILLISECONDS));
+            assertTrue(failedLatch.await(2000, MILLISECONDS));
 
             assertTrue(coordSegmented.get());