You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 15:22:59 UTC
[3/4] incubator-ignite git commit: ignite-752: failure detection
threshold support in TcpCommunicationSpi
ignite-752: failure detection threshold support in TcpCommunicationSpi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0949c93f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0949c93f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0949c93f
Branch: refs/heads/ignite-752
Commit: 0949c93fe1d1dd435b906fb1d79263e9073c0f1b
Parents: c3c0ef8
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 13:25:48 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 13:25:48 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 20 ++--
.../IgniteSpiOperationTimeoutController.java | 7 +-
.../communication/tcp/TcpCommunicationSpi.java | 106 +++++++++++++++----
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
4 files changed, 102 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 422ce81..500c461 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -203,15 +203,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
}
- /** {@inheritDoc} */
- @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
- if (!failureDetectionThresholdEnabled) {
- failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
-
- assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
- }
- }
-
/**
* Inject ignite instance.
*/
@@ -579,6 +570,17 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/**
* TODO: IGNITE-752
+ */
+ protected void initFailureDetectionThreshold() {
+ if (failureDetectionThresholdEnabled) {
+ failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
+
+ assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
+ }
+ }
+
+ /**
+ * TODO: IGNITE-752
* @param enabled
*/
public void failureDetectionThresholdEnabled(boolean enabled) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
index 3ae4fa4..ba95871 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
@@ -46,6 +46,9 @@ public class IgniteSpiOperationTimeoutController {
public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) {
failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
failureDetectionThreshold = adapter.failureDetectionThreshold();
+
+ assert !failureDetectionThresholdEnabled || failureDetectionThreshold > 0 : " [failureDetectionThreshold=" +
+ failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled + ']';
}
/**
@@ -70,8 +73,8 @@ public class IgniteSpiOperationTimeoutController {
lastOperStartTs = curTs;
if (timeout <= 0)
- throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase failure detection" +
- " threshold using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts" +
+ throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+ "'failureDetectionThreshold' configuration property or set SPI specific timeouts" +
" manually. Current failure detection threshold: " + failureDetectionThreshold);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ca2995..b2bc9eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -970,6 +970,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@IgniteSpiConfiguration(optional = true)
public void setConnectTimeout(long connTimeout) {
this.connTimeout = connTimeout;
+
+ failureDetectionThresholdEnabled(false);
}
/** {@inheritDoc} */
@@ -992,6 +994,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@IgniteSpiConfiguration(optional = true)
public void setMaxConnectTimeout(long maxConnTimeout) {
this.maxConnTimeout = maxConnTimeout;
+
+ failureDetectionThresholdEnabled(false);
}
/** {@inheritDoc} */
@@ -1010,6 +1014,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@IgniteSpiConfiguration(optional = true)
public void setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
+
+ failureDetectionThresholdEnabled(false);
}
/** {@inheritDoc} */
@@ -1239,6 +1245,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
nodeIdMsg = new NodeIdMessage(getLocalNodeId());
+ initFailureDetectionThreshold();
+
assertParameter(locPort > 1023, "locPort > 1023");
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1247,10 +1255,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
- assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
- assertParameter(connTimeout >= 0, "connTimeout >= 0");
- assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+
+ if (!failureDetectionThresholdEnabled()) {
+ assertParameter(reconCnt > 0, "reconnectCnt > 0");
+ assertParameter(connTimeout >= 0, "connTimeout >= 0");
+ assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+ }
+
assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
@@ -1260,7 +1272,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
- "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
+ "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
}
try {
@@ -1326,9 +1338,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
- log.debug(configInfo("connTimeout", connTimeout));
- log.debug(configInfo("maxConnTimeout", maxConnTimeout));
- log.debug(configInfo("reconCnt", reconCnt));
+
+ if (failureDetectionThresholdEnabled()) {
+ log.debug(configInfo("connTimeout", connTimeout));
+ log.debug(configInfo("maxConnTimeout", maxConnTimeout));
+ log.debug(configInfo("reconCnt", reconCnt));
+ }
+ else
+ log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold()));
+
log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
log.debug(configInfo("ackSndThreshold", ackSndThreshold));
log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
@@ -1850,17 +1868,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long connTimeout0 = connTimeout;
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+
while (true) {
GridCommunicationClient client;
try {
client = new GridShmemCommunicationClient(metricsLsnr,
port,
- connTimeout,
+ timeoutCtrl.nextTimeoutChunk(connTimeout),
log,
getSpiContext().messageFormatter());
}
catch (IgniteCheckedException e) {
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ throw e;
+
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
connectAttempts++;
@@ -1872,15 +1895,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client, null, node.id(), connTimeout0);
+ safeHandshake(client, null, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
}
- catch (HandshakeTimeoutException e) {
+ catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+ client.forceClose();
+
+ if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
+ timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+ log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" +
+ failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']');
+
+ throw e;
+ }
+
+ assert !failureDetectionThresholdEnabled();
+
if (log.isDebugEnabled())
- log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+ log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", err=" + e.getMessage() + ", client=" + client + ']');
- client.forceClose();
-
if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
if (log.isDebugEnabled())
log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
@@ -1889,8 +1922,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
- }
- else {
+ } else {
attempt++;
connTimeout0 *= 2;
@@ -1994,6 +2026,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
int attempt = 1;
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+
while (!conn) { // Reconnection on handshake timeout.
try {
SocketChannel ch = SocketChannel.open();
@@ -2020,9 +2054,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt = -1;
try {
- ch.socket().connect(addr, (int)connTimeout);
+ ch.socket().connect(addr, (int)timeoutCtrl.nextTimeoutChunk(connTimeout));
- rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+ rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
if (rcvCnt == -1)
return null;
@@ -2056,14 +2090,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
}
- catch (HandshakeTimeoutException e) {
+ catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
if (client != null) {
client.forceClose();
client = null;
}
- onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+ if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
+ timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+
+ String msg = "Handshake timed out (failure detection threshold is reached) " +
+ "[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']';
+
+ onException(msg, e);
+
+ if (log.isDebugEnabled())
+ log.debug(msg);
+
+ if (errs == null)
+ errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+
+ errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+
+ break;
+ }
+
+ assert !failureDetectionThresholdEnabled();
+
+ onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", addr=" + addr + ']', e);
if (log.isDebugEnabled())
@@ -2108,7 +2166,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
- if (X.hasCause(e, SocketTimeoutException.class))
+ boolean failureDetThrReached = timeoutCtrl.checkFailureDetectionThresholdReached(e);
+
+ if (failureDetThrReached)
+ LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " +
+ "configuration property) [addr=" + addr + ", failureDetectionThreshold=" +
+ failureDetectionThreshold() + ']');
+ else if (X.hasCause(e, SocketTimeoutException.class))
LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
"configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
@@ -2121,7 +2185,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
// Reconnect for the second time, if connection is not established.
- if (connectAttempts < 2 &&
+ if (!failureDetThrReached && connectAttempts < 2 &&
(e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
connectAttempts++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 5ed946b..5be7ab9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1560,7 +1560,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
- super.spiStart(gridName);
+ initFailureDetectionThreshold();
if (!failureDetectionThresholdEnabled()) {
if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {