You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/09/23 18:36:38 UTC
[ignite] branch master updated: Revert "IGNITE-13134 : checking if
failureDetectionTimeout enabled. - Fixes #7916."
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 16e1bc2 Revert "IGNITE-13134 : checking if failureDetectionTimeout enabled. - Fixes #7916."
16e1bc2 is described below
commit 16e1bc25216e00834ffc38dc0a0444097ae651a1
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Wed Sep 23 21:26:36 2020 +0300
Revert "IGNITE-13134 : checking if failureDetectionTimeout enabled. - Fixes #7916."
This reverts commit 1ca486eb - Fixes #8265.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../spi/IgniteSpiOperationTimeoutHelper.java | 86 ++++++++++++----------
.../tcp/internal/ConnectionClientPool.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 79 ++++++--------------
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 11 ++-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +-
6 files changed, 79 insertions(+), 103 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index 96fea9e..48161ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -30,11 +30,20 @@ import org.apache.ignite.internal.util.typedef.internal.U;
*
*/
public class IgniteSpiOperationTimeoutHelper {
- /** Flag whether to use timeout. */
- private final boolean timeoutEnabled;
+ // https://issues.apache.org/jira/browse/IGNITE-11221
+ // We need to reuse new logic ExponentialBackoffTimeout logic in TcpDiscovery instead of this class.
- /** Time in nanos which cannot be reached for current operation. */
- private final long timeoutThreshold;
+ /** */
+ private long lastOperStartNanos;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private final boolean failureDetectionTimeoutEnabled;
+
+ /** */
+ private final long failureDetectionTimeout;
/**
* Constructor.
@@ -43,7 +52,9 @@ public class IgniteSpiOperationTimeoutHelper {
* @param srvOp {@code True} if communicates with server node.
*/
public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) {
- this(adapter, srvOp, -1, -1);
+ failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled();
+ failureDetectionTimeout = srvOp ? adapter.failureDetectionTimeout() :
+ adapter.clientFailureDetectionTimeout();
}
/**
@@ -51,26 +62,15 @@ public class IgniteSpiOperationTimeoutHelper {
*
* @param adapter SPI adapter.
* @param srvOp {@code True} if communicates with server node.
- * @param lastRelatedOperationTime Time of last related operation in nanos. Ignored if negative, 0 or
- * {@code adapter.failureDetectionTimeoutEnabled()} is false.
- * @param absoluteThreshold Absolute time threshold (nanos) which must not be reached. Ignored if negative or 0.
+ * @param lastOperStartNanos Time of last related operation in nanos.
*/
- public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastRelatedOperationTime,
- long absoluteThreshold) {
- timeoutEnabled = adapter.failureDetectionTimeoutEnabled();
+ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastOperStartNanos) {
+ this(adapter, srvOp);
- if (timeoutEnabled) {
- long timeout = (lastRelatedOperationTime > 0 ? lastRelatedOperationTime : System.nanoTime()) +
- U.millisToNanos(srvOp ? adapter.failureDetectionTimeout() : adapter.clientFailureDetectionTimeout());
+ this.lastOperStartNanos = lastOperStartNanos;
- if (absoluteThreshold > 0 && timeout > absoluteThreshold)
- timeout = absoluteThreshold;
-
- timeoutThreshold = timeout;
- } else {
- // Save absolute threshold if it is set.
- timeoutThreshold = absoluteThreshold > 0 ? absoluteThreshold : 0;
- }
+ if (lastOperStartNanos > 0)
+ timeout = failureDetectionTimeout;
}
/**
@@ -85,32 +85,42 @@ public class IgniteSpiOperationTimeoutHelper {
* this {@code IgniteSpiOperationTimeoutController}.
*/
public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
- long now = System.nanoTime();
-
- long left;
+ if (!failureDetectionTimeoutEnabled)
+ return dfltTimeout;
- if (timeoutEnabled)
- left = timeoutThreshold - now;
+ if (lastOperStartNanos == 0) {
+ timeout = failureDetectionTimeout;
+ lastOperStartNanos = System.nanoTime();
+ }
else {
- left = U.millisToNanos(dfltTimeout);
+ long curNanos = System.nanoTime();
- if (timeoutThreshold > 0 && now + left >= timeoutThreshold)
- left = timeoutThreshold - now;
- }
+ timeout -= U.nanosToMillis(curNanos - lastOperStartNanos);
- if (left <= 0)
- throw new IgniteSpiOperationTimeoutException("Network operation timed out.");
+ lastOperStartNanos = curNanos;
- return U.nanosToMillis(left);
+ if (timeout <= 0)
+ throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+ "'failureDetectionTimeout' configuration property [failureDetectionTimeout="
+ + failureDetectionTimeout + ']');
+ }
+
+ return timeout;
}
/**
- * Checks whether the given {@link Exception} is a timeout.
+ * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached.
*
- * @param e Exception to check.
- * @return {@code True} if given exception is a timeout. {@code False} otherwise.
+ * @param e Exception.
+ * @return {@code true} if failure detection timeout is reached, {@code false} otherwise.
*/
public boolean checkFailureTimeoutReached(Exception e) {
- return X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class);
+ if (!failureDetectionTimeoutEnabled)
+ return false;
+
+ if (X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class))
+ return true;
+
+ return (timeout - U.millisSinceNanos(lastOperStartNanos) <= 0);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index 14a0219..9476e44 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -521,7 +521,7 @@ public class ConnectionClientPool {
msgFormatterSupplier.get());
}
catch (IgniteCheckedException e) {
- if (connectAttempts == 1 && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
throw e;
// Reconnect for the second time, if connection is not established.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 1df5abb..5017e18 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -841,7 +841,7 @@ class ClientImpl extends TcpDiscoveryImpl {
break;
}
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b85d672c..9d5e3ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -922,7 +922,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!openedSock && reconCnt == 2)
break;
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount())
break;
@@ -1576,7 +1576,7 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
@@ -3477,12 +3477,8 @@ class ServerImpl extends TcpDiscoveryImpl {
while (true) {
if (sock == null) {
- // We re-create the helper here because it could be created earlier with wrong timeout on
- // message sending like IgniteConfiguration.failureDetectionTimeout. Here we are in the
- // state of conenction recovering and have to work with
- // TcpDiscoverSpi.getEffectiveConnectionRecoveryTimeout()
- if (timeoutHelper == null || sndState != null)
- timeoutHelper = serverOperationTimeoutHelper(sndState, -1);
+ if (timeoutHelper == null)
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
boolean success = false;
@@ -3521,8 +3517,6 @@ class ServerImpl extends TcpDiscoveryImpl {
// We should take previousNodeAlive flag into account only if we received the response from the correct node.
if (res.creatorNodeId().equals(next.id()) && res.previousNodeAlive() && sndState != null) {
- sndState.checkTimeout();
-
// Remote node checked connection to it's previous and got success.
boolean previousNode = sndState.markLastFailedNodeAlive();
@@ -3630,20 +3624,13 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e);
- // Fastens failure detection.
- if (sndState != null && sndState.checkTimeout()) {
- segmentLocalNodeOnSendFail(failedNodes);
-
- return; // Nothing to do here.
- }
-
if (!openSock)
break; // Don't retry if we can not establish connection.
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
break;
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
@@ -3702,8 +3689,10 @@ class ServerImpl extends TcpDiscoveryImpl {
addFailedNodes(pendingMsg, failedNodes);
- if (timeoutHelper == null)
- timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
+ if (timeoutHelper == null) {
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true,
+ lastRingMsgSentTime);
+ }
try {
spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
@@ -3747,7 +3736,7 @@ class ServerImpl extends TcpDiscoveryImpl {
long tsNanos = System.nanoTime();
if (timeoutHelper == null)
- timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime);
addFailedNodes(msg, failedNodes);
@@ -3814,7 +3803,7 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
e);
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled()) {
@@ -3851,11 +3840,6 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!sent) {
if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0)
sndState = new CrossRingMessageSendState();
- else if (sndState != null && sndState.checkTimeout()) {
- segmentLocalNodeOnSendFail(failedNodes);
-
- return; // Nothing to do here.
- }
boolean failedNextNode = sndState == null || sndState.markNextNodeFailed();
@@ -3889,6 +3873,12 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ if (sndState != null && sndState.isFailed()) {
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return; // Nothing to do here.
+ }
+
next = null;
errs = null;
@@ -6511,19 +6501,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- /**
- * Creates proper timeout helper taking in account current send state.
- *
- * @param sndState Current connection recovering state. Ignored if {@code null}.
- * @param lastOperationNanos Time of last related operation. Ignored if negative or 0.
- * @return Timeout helper.
- */
- private IgniteSpiOperationTimeoutHelper serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState,
- long lastOperationNanos) {
- return new IgniteSpiOperationTimeoutHelper(spi, true, lastOperationNanos,
- sndState == null ? -1 : sndState.failTimeNanos);
- }
-
/** Fixates time of last sent message. */
private void updateLastSentMessageTime() {
lastRingMsgSentTime = System.nanoTime();
@@ -8220,22 +8197,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Checks if message sending has completely failed due to the timeout. Sets {@code RingMessageSendState#FAILED}
- * if the timeout is reached.
- *
- * @return {@code True} if passed timeout is reached. {@code False} otherwise.
- */
- boolean checkTimeout() {
- if (System.nanoTime() >= failTimeNanos) {
- state = RingMessageSendState.FAILED;
-
- return true;
- }
-
- return false;
- }
-
- /**
* Marks last failed node as alive.
*
* @return {@code False} if all failed nodes marked as alive or incorrect state.
@@ -8247,6 +8208,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (--failedNodes <= 0) {
failedNodes = 0;
+ if (System.nanoTime() - failTimeNanos >= 0) {
+ state = RingMessageSendState.FAILED;
+
+ return false;
+ }
+
state = RingMessageSendState.STARTING_POINT;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 9bdb886..f1e8406 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2530,12 +2530,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
LT.warn(log, "Socket write has timed out (consider increasing " +
(failureDetectionTimeoutEnabled() ?
- "'IgniteConfiguration.failureDetectionTimeout' and 'connRecoveryTimeout' configuration " +
- "properties) [failureDetectionTimeout=" + failureDetectionTimeout() :
- "'sockTimeout' and 'connRecoveryTimeout' configuration properties) [sockTimeout="
- + sockTimeout) + ", connRecoveryTimeout=" + getConnectionRecoveryTimeout() + ", rmtAddr="
- + sock.getRemoteSocketAddress() + ", rmtPort=" + sock.getPort() + ", sockTimeout=" + sockTimeout
- + ']');
+ "'IgniteConfiguration.failureDetectionTimeout' configuration property) [" +
+ "failureDetectionTimeout=" + failureDetectionTimeout() :
+ "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout) +
+ ", rmtAddr=" + sock.getRemoteSocketAddress() + ", rmtPort=" + sock.getPort() +
+ ", sockTimeout=" + sockTimeout + ']');
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index fcf510f..16fd751 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -1976,7 +1976,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
ignite1.configuration().getDiscoverySpi().failNode(coordId, null);
- assertTrue(failedLatch.await(ignite1.configuration().getFailureDetectionTimeout(), MILLISECONDS));
+ assertTrue(failedLatch.await(2000, MILLISECONDS));
assertTrue(coordSegmented.get());