You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/09/23 18:39:29 UTC
[ignite] branch ignite-2.9 updated: Revert "IGNITE-13134 : checking
if failureDetectionTimeout enabled. - Fixes #7916."
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch ignite-2.9
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.9 by this push:
new 6697b2e Revert "IGNITE-13134 : checking if failureDetectionTimeout enabled. - Fixes #7916."
6697b2e is described below
commit 6697b2e22dce6971d341873285e7fddefebb7ce7
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Wed Sep 23 21:37:22 2020 +0300
Revert "IGNITE-13134 : checking if failureDetectionTimeout enabled. - Fixes #7916."
This reverts commit 06364dca - Fixes #8264.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../spi/IgniteSpiOperationTimeoutHelper.java | 83 +++++++++++-----------
.../spi/communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 79 ++++++--------------
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 11 ++-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +-
6 files changed, 71 insertions(+), 108 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index 20a5ebf..48161ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -33,14 +33,17 @@ public class IgniteSpiOperationTimeoutHelper {
// https://issues.apache.org/jira/browse/IGNITE-11221
// We need to reuse new logic ExponentialBackoffTimeout logic in TcpDiscovery instead of this class.
- /** Flag whether to use timeout. */
- private final boolean timeoutEnabled;
+ /** */
+ private long lastOperStartNanos;
- /** Time in nanos which cannot be reached for current operation. */
- private final long timeoutThreshold;
+ /** */
+ private long timeout;
- /** Keeps {@code true} if last call to {@link #nextTimeoutChunk(long)} has timeouted. {@code False} otherwise. */
- private boolean lastOperationTimeouted;
+ /** */
+ private final boolean failureDetectionTimeoutEnabled;
+
+ /** */
+ private final long failureDetectionTimeout;
/**
* Constructor.
@@ -49,7 +52,9 @@ public class IgniteSpiOperationTimeoutHelper {
* @param srvOp {@code True} if communicates with server node.
*/
public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) {
- this(adapter, srvOp, -1, -1);
+ failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled();
+ failureDetectionTimeout = srvOp ? adapter.failureDetectionTimeout() :
+ adapter.clientFailureDetectionTimeout();
}
/**
@@ -57,26 +62,15 @@ public class IgniteSpiOperationTimeoutHelper {
*
* @param adapter SPI adapter.
* @param srvOp {@code True} if communicates with server node.
- * @param lastRelatedOperationTime Time of last related operation in nanos. Ignored if negative, 0 or
- * {@code adapter.failureDetectionTimeoutEnabled()} is false.
- * @param absoluteThreshold Absolute time threshold (nanos) which must not be reached. Ignored if negative or 0.
+ * @param lastOperStartNanos Time of last related operation in nanos.
*/
- public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastRelatedOperationTime,
- long absoluteThreshold) {
- timeoutEnabled = adapter.failureDetectionTimeoutEnabled();
-
- if (timeoutEnabled) {
- long timeout = (lastRelatedOperationTime > 0 ? lastRelatedOperationTime : System.nanoTime()) +
- U.millisToNanos(srvOp ? adapter.failureDetectionTimeout() : adapter.clientFailureDetectionTimeout());
+ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastOperStartNanos) {
+ this(adapter, srvOp);
- if (absoluteThreshold > 0 && timeout > absoluteThreshold)
- timeout = absoluteThreshold;
+ this.lastOperStartNanos = lastOperStartNanos;
- timeoutThreshold = timeout;
- } else {
- // Save absolute threshold if it is set.
- timeoutThreshold = absoluteThreshold > 0 ? absoluteThreshold : 0;
- }
+ if (lastOperStartNanos > 0)
+ timeout = failureDetectionTimeout;
}
/**
@@ -91,39 +85,42 @@ public class IgniteSpiOperationTimeoutHelper {
* this {@code IgniteSpiOperationTimeoutController}.
*/
public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
- long now = System.nanoTime();
-
- long left;
+ if (!failureDetectionTimeoutEnabled)
+ return dfltTimeout;
- if (timeoutEnabled)
- left = timeoutThreshold - now;
+ if (lastOperStartNanos == 0) {
+ timeout = failureDetectionTimeout;
+ lastOperStartNanos = System.nanoTime();
+ }
else {
- left = U.millisToNanos(dfltTimeout);
+ long curNanos = System.nanoTime();
- if (timeoutThreshold > 0 && now + left >= timeoutThreshold)
- left = timeoutThreshold - now;
- }
+ timeout -= U.nanosToMillis(curNanos - lastOperStartNanos);
- if (left <= 0) {
- lastOperationTimeouted = true;
+ lastOperStartNanos = curNanos;
- throw new IgniteSpiOperationTimeoutException("Network operation timed out.");
+ if (timeout <= 0)
+ throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+ "'failureDetectionTimeout' configuration property [failureDetectionTimeout="
+ + failureDetectionTimeout + ']');
}
-
- return U.nanosToMillis(left);
+
+ return timeout;
}
/**
- * Checks whether the given {@link Exception} is a timeout-exception or the has been reached in last call to
- * {@code nextTimeoutChunk(long)}.
+ * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached.
*
- * @param e Exception to check if is a timeout.
- * @return {@code True} if the excaption is a timeout or failure timeout was reached. {@code False} otherwise.
+ * @param e Exception.
+ * @return {@code true} if failure detection timeout is reached, {@code false} otherwise.
*/
public boolean checkFailureTimeoutReached(Exception e) {
+ if (!failureDetectionTimeoutEnabled)
+ return false;
+
if (X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class))
return true;
- return lastOperationTimeouted;
+ return (timeout - U.millisSinceNanos(lastOperStartNanos) <= 0);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index be5f524..2598cbf 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3442,7 +3442,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
getSpiContext().messageFormatter());
}
catch (IgniteCheckedException e) {
- if (connectAttempts == 1 && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
throw e;
// Reconnect for the second time, if connection is not established.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 3ae597e..4193306 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -826,7 +826,7 @@ class ClientImpl extends TcpDiscoveryImpl {
break;
}
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 287347d..6992ef5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -923,7 +923,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!openedSock && reconCnt == 2)
break;
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount())
break;
@@ -1579,7 +1579,7 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
@@ -3480,12 +3480,8 @@ class ServerImpl extends TcpDiscoveryImpl {
while (true) {
if (sock == null) {
- // We re-create the helper here because it could be created earlier with wrong timeout on
- // message sending like IgniteConfiguration.failureDetectionTimeout. Here we are in the
- // state of conenction recovering and have to work with
- // TcpDiscoverSpi.getEffectiveConnectionRecoveryTimeout()
- if (timeoutHelper == null || sndState != null)
- timeoutHelper = serverOperationTimeoutHelper(sndState, -1);
+ if (timeoutHelper == null)
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
boolean success = false;
@@ -3524,8 +3520,6 @@ class ServerImpl extends TcpDiscoveryImpl {
// We should take previousNodeAlive flag into account only if we received the response from the correct node.
if (res.creatorNodeId().equals(next.id()) && res.previousNodeAlive() && sndState != null) {
- sndState.checkTimeout();
-
// Remote node checked connection to it's previous and got success.
boolean previousNode = sndState.markLastFailedNodeAlive();
@@ -3635,20 +3629,13 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e);
- // Fastens failure detection.
- if (sndState != null && sndState.checkTimeout()) {
- segmentLocalNodeOnSendFail(failedNodes);
-
- return; // Nothing to do here.
- }
-
if (!openSock)
break; // Don't retry if we can not establish connection.
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
break;
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
@@ -3707,8 +3694,10 @@ class ServerImpl extends TcpDiscoveryImpl {
addFailedNodes(pendingMsg, failedNodes);
- if (timeoutHelper == null)
- timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
+ if (timeoutHelper == null) {
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true,
+ lastRingMsgSentTime);
+ }
try {
spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
@@ -3752,7 +3741,7 @@ class ServerImpl extends TcpDiscoveryImpl {
long tsNanos = System.nanoTime();
if (timeoutHelper == null)
- timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime);
addFailedNodes(msg, failedNodes);
@@ -3819,7 +3808,7 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
e);
- if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+ if (timeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled()) {
@@ -3856,11 +3845,6 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!sent) {
if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0)
sndState = new CrossRingMessageSendState();
- else if (sndState != null && sndState.checkTimeout()) {
- segmentLocalNodeOnSendFail(failedNodes);
-
- return; // Nothing to do here.
- }
boolean failedNextNode = sndState == null || sndState.markNextNodeFailed();
@@ -3894,6 +3878,12 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ if (sndState != null && sndState.isFailed()) {
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return; // Nothing to do here.
+ }
+
next = null;
errs = null;
@@ -6526,19 +6516,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- /**
- * Creates proper timeout helper taking in account current send state.
- *
- * @param sndState Current connection recovering state. Ignored if {@code null}.
- * @param lastOperationNanos Time of last related operation. Ignored if negative or 0.
- * @return Timeout helper.
- */
- private IgniteSpiOperationTimeoutHelper serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState,
- long lastOperationNanos) {
- return new IgniteSpiOperationTimeoutHelper(spi, true, lastOperationNanos,
- sndState == null ? -1 : sndState.failTimeNanos);
- }
-
/** Fixates time of last sent message. */
private void updateLastSentMessageTime() {
lastRingMsgSentTime = System.nanoTime();
@@ -8241,22 +8218,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Checks if message sending has completely failed due to the timeout. Sets {@code RingMessageSendState#FAILED}
- * if the timeout is reached.
- *
- * @return {@code True} if passed timeout is reached. {@code False} otherwise.
- */
- boolean checkTimeout() {
- if (System.nanoTime() >= failTimeNanos) {
- state = RingMessageSendState.FAILED;
-
- return true;
- }
-
- return false;
- }
-
- /**
* Marks last failed node as alive.
*
* @return {@code False} if all failed nodes marked as alive or incorrect state.
@@ -8268,6 +8229,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (--failedNodes <= 0) {
failedNodes = 0;
+ if (System.nanoTime() - failTimeNanos >= 0) {
+ state = RingMessageSendState.FAILED;
+
+ return false;
+ }
+
state = RingMessageSendState.STARTING_POINT;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 40e106e..71511d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2494,12 +2494,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
LT.warn(log, "Socket write has timed out (consider increasing " +
(failureDetectionTimeoutEnabled() ?
- "'IgniteConfiguration.failureDetectionTimeout' and 'connRecoveryTimeout' configuration " +
- "properties) [failureDetectionTimeout=" + failureDetectionTimeout() :
- "'sockTimeout' and 'connRecoveryTimeout' configuration properties) [sockTimeout="
- + sockTimeout) + ", connRecoveryTimeout=" + getConnectionRecoveryTimeout() + ", rmtAddr="
- + sock.getRemoteSocketAddress() + ", rmtPort=" + sock.getPort() + ", sockTimeout=" + sockTimeout
- + ']');
+ "'IgniteConfiguration.failureDetectionTimeout' configuration property) [" +
+ "failureDetectionTimeout=" + failureDetectionTimeout() :
+ "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout) +
+ ", rmtAddr=" + sock.getRemoteSocketAddress() + ", rmtPort=" + sock.getPort() +
+ ", sockTimeout=" + sockTimeout + ']');
stats.onSocketTimeout();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index fcf510f..16fd751 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -1976,7 +1976,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
ignite1.configuration().getDiscoverySpi().failNode(coordId, null);
- assertTrue(failedLatch.await(ignite1.configuration().getFailureDetectionTimeout(), MILLISECONDS));
+ assertTrue(failedLatch.await(2000, MILLISECONDS));
assertTrue(coordSegmented.get());