You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/21 15:33:12 UTC
incubator-ignite git commit: ignite-752: network timeout must not
affect failure detection threshold
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-752 a371f049b -> 9319206bd
ignite-752: network timeout must not affect failure detection threshold
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9319206b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9319206b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9319206b
Branch: refs/heads/ignite-752
Commit: 9319206bd49146c45a4256753eb2768042011d22
Parents: a371f04
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Jul 21 16:33:04 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Jul 21 16:33:04 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 56 ++++++-----------
.../ignite/spi/discovery/tcp/ServerImpl.java | 65 +++++++++-----------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +-
3 files changed, 50 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 00c668a..196c1b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -203,8 +203,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(SPI_STOP);
try {
- if (!leaveLatch.await(spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
- spi.getNetworkTimeout(), MILLISECONDS))
+ if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
}
catch (InterruptedException ignored) {
@@ -294,8 +293,7 @@ class ClientImpl extends TcpDiscoveryImpl {
finalFut.onDone(false);
}
}
- }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
- spi.getNetworkTimeout());
+ }, spi.netTimeout);
sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
}
@@ -1076,8 +1074,7 @@ class ClientImpl extends TcpDiscoveryImpl {
Exception err = null;
- long timeout = join ? spi.getJoinTimeout() : (spi.failureDetectionThresholdEnabled() ?
- spi.failureDetectionThreshold() : spi.getNetworkTimeout());
+ long timeout = join ? spi.joinTimeout : spi.netTimeout;
long startTime = U.currentTimeMillis();
@@ -1092,15 +1089,11 @@ class ClientImpl extends TcpDiscoveryImpl {
if (join) {
joinError(new IgniteSpiException("Join process timed out, connection failed and " +
"failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
- "[networkTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
+ "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
}
else
- U.error(log, "Failed to reconnect to cluster " +
- (spi.failureDetectionThresholdEnabled() ?
- "(consider increasing 'failureDetectionThreshold' configuration property) " +
- "[failureDetectionThreshold=" + spi.failureDetectionThreshold() :
- "(consider increasing 'networkTimeout' configuration property) [networkTimeout=" +
- spi.getNetworkTimeout()) + ", sock=" + sock + ']');
+ U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" +
+ " configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
return;
}
@@ -1116,8 +1109,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
oldTimeout = sock.getSoTimeout();
- sock.setSoTimeout((int)(spi.failureDetectionThresholdEnabled() ?
- spi.failureDetectionThreshold() : spi.getNetworkTimeout()));
+ sock.setSoTimeout((int)spi.netTimeout);
InputStream in = new BufferedInputStream(sock.getInputStream());
@@ -1127,8 +1119,6 @@ class ClientImpl extends TcpDiscoveryImpl {
List<TcpDiscoveryAbstractMessage> msgs = null;
while (!isInterrupted()) {
- // How to deal with failure detection threshold? It's bigger then networkTimeout and
- // we can't just decrease it on every unmarshal operation.
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -1174,15 +1164,11 @@ class ClientImpl extends TcpDiscoveryImpl {
if (join)
msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " +
- "configuration property) [joinTimeout=" + spi.getJoinTimeout() + ", err=" + e +
- ']';
+ "configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']';
else
- msg = "Failed to reconnect to cluster " +
- (spi.failureDetectionThresholdEnabled() ?
- "(consider increasing 'failureDetectionThreshold' configuration property) " +
- "[failureDetectionThreshold=" + spi.failureDetectionThreshold() :
- "(consider increasing 'networkTimeout' configuration property) " +
- "[networkTimeout=" + spi.getNetworkTimeout()) + ", sock=" + sock + ']';
+ msg = "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
+ "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock
+ + ']';
U.warn(log, msg);
@@ -1260,14 +1246,14 @@ class ClientImpl extends TcpDiscoveryImpl {
if (state == STARTING) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.getJoinTimeout() + ", sock=" + currSock + ']'));
+ "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
break;
}
else if (state == DISCONNECTED) {
if (log.isDebugEnabled())
log.debug("Failed to reconnect, local node segmented " +
- "[joinTimeout=" + spi.getJoinTimeout() + ']');
+ "[joinTimeout=" + spi.joinTimeout + ']');
state = SEGMENTED;
@@ -1326,10 +1312,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (state != SEGMENTED && state != STOPPED) {
if (log.isDebugEnabled()) {
log.debug("Failed to restore closed connection, reconnect disabled, " +
- (spi.failureDetectionThresholdEnabled() ?
- "local node segmented [failureDetectionThreshold=" +
- spi.failureDetectionThreshold() + ']' :
- "local node segmented [networkTimeout=" + spi.getNetworkTimeout() + ']'));
+ "local node segmented [networkTimeout=" + spi.netTimeout + ']');
}
state = SEGMENTED;
@@ -1341,10 +1324,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (state == STARTING || state == CONNECTED) {
if (log.isDebugEnabled()) {
log.debug("Failed to restore closed connection, will try to reconnect " +
- (spi.failureDetectionThresholdEnabled() ?
- "[failureDetectionThreshold=" + spi.failureDetectionThreshold() :
- "[networkTimeout=" + spi.getNetworkTimeout()) +
- ", joinTimeout=" + spi.getJoinTimeout() + ']');
+ "[networkTimeout=" + spi.netTimeout + ", joinTimeout=" + spi.joinTimeout + ']');
}
state = DISCONNECTED;
@@ -1433,7 +1413,7 @@ class ClientImpl extends TcpDiscoveryImpl {
joinCnt++;
- T2<Socket, Boolean> joinRes = joinTopology(false, spi.getJoinTimeout());
+ T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
if (joinRes == null) {
if (join)
@@ -1451,7 +1431,7 @@ class ClientImpl extends TcpDiscoveryImpl {
sockWriter.setSocket(joinRes.get1(), joinRes.get2());
- if (spi.getJoinTimeout() > 0) {
+ if (spi.joinTimeout > 0) {
final int joinCnt0 = joinCnt;
timer.schedule(new TimerTask() {
@@ -1459,7 +1439,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (joinCnt == joinCnt0 && joining())
queue.add(JOIN_TIMEOUT);
}
- }, spi.getJoinTimeout());
+ }, spi.joinTimeout);
}
sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b475768..02e13ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -278,8 +278,7 @@ class ServerImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
synchronized (mux) {
- long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
- spi.getNetworkTimeout();
+ long timeout = spi.netTimeout;
long threshold = U.currentTimeMillis() + timeout;
@@ -535,7 +534,7 @@ class ServerImpl extends TcpDiscoveryImpl {
timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
- spi.getNetworkTimeout()));
+ spi.getAckTimeout()));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -701,8 +700,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Join request message has been sent (waiting for coordinator response).");
synchronized (mux) {
- long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
- spi.getNetworkTimeout();
+ long timeout = spi.netTimeout;
long threshold = U.currentTimeMillis() + timeout;
@@ -746,9 +744,8 @@ class ServerImpl extends TcpDiscoveryImpl {
LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
"Check remote nodes logs for possible error messages. " +
"Note that large topology may require significant time to start. " +
- "Increase 'IgniteConfiguration.failureDetectionThreshold' configuration property " +
- "if getting this message on the starting nodes [failureDetectionThreshold=" +
- spi.failureDetectionThreshold() + ']');
+ "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
+ "if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']');
}
}
@@ -868,10 +865,10 @@ class ServerImpl extends TcpDiscoveryImpl {
"(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
addrs);
- if (spi.getJoinTimeout() > 0) {
+ if (spi.joinTimeout > 0) {
if (noResStart == 0)
noResStart = U.currentTimeMillis();
- else if (U.currentTimeMillis() - noResStart > spi.getJoinTimeout())
+ else if (U.currentTimeMillis() - noResStart > spi.joinTimeout)
throw new IgniteSpiException(
"Failed to connect to any address from IP finder within join timeout " +
"(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
@@ -2263,7 +2260,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Handshake.
writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
- timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout()));
+ timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
timeoutCtrl.nextTimeoutChunk(ackTimeout0));
@@ -3829,27 +3826,29 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Responded to status check message " +
- "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']');
- }
- catch (IgniteSpiException e) {
+ "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status
+ () + ']');
+ } catch (IgniteSpiException e) {
if (e.hasCause(SocketException.class)) {
if (log.isDebugEnabled())
log.debug("Failed to respond to status check message (connection " +
- "refused) [recipient=" + msg.creatorNodeId() + ", status=" +
- msg.status() + ']');
+ "refused) [recipient=" + msg.creatorNodeId() + ", status=" +
+ msg.status() + ']');
onException("Failed to respond to status check message (connection refused) " +
- "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
- }
- else {
+ "[recipient=" + msg.creatorNodeId() + ", status=" + msg
+ .status() + ']',
+ e);
+ } else {
if (pingNode(msg.creatorNode()))
// Node exists and accepts incoming connections.
U.error(log, "Failed to respond to status check message [recipient=" +
msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
else if (log.isDebugEnabled())
log.debug("Failed to respond to status check message (did the node " +
- "stop?) [recipient=" + msg.creatorNodeId() + ", status=" + msg.status()
- + ']');
+ "stop?) [recipient=" + msg.creatorNodeId() + ", status=" +
+ msg.status()
+ + ']');
}
}
}
@@ -4407,8 +4406,6 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
InputStream in;
- IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
-
try {
// Set socket options.
sock.setKeepAlive(true);
@@ -4416,7 +4413,7 @@ class ServerImpl extends TcpDiscoveryImpl {
int timeout = sock.getSoTimeout();
- sock.setSoTimeout((int)timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout()));
+ sock.setSoTimeout((int)spi.netTimeout);
for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
connLsnr.apply(sock);
@@ -4461,8 +4458,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Restore timeout.
sock.setSoTimeout(timeout);
- TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, timeoutCtrl.nextTimeoutChunk(
- spi.getNetworkTimeout()));
+ TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
// Ping.
if (msg instanceof TcpDiscoveryPingRequest) {
@@ -4471,6 +4467,9 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
+ IgniteSpiOperationTimeoutController timeoutCtrl =
+ new IgniteSpiOperationTimeoutController(spi);
+
if (req.clientNodeId() != null) {
ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
@@ -4486,8 +4485,6 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
-
-
// Handshake.
TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
@@ -4501,7 +4498,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (req.client())
res.clientAck(true);
- spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, res, spi.failureDetectionThresholdEnabled() ?
+ spi.failureDetectionThreshold() : spi.getSocketTimeout());
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
// the local node sends a handshake request message on the loopback address, so we get here.
@@ -4591,15 +4589,10 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
- if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
- LT.warn(log, null, "Socket operations timed out " +
- "(consider increasing 'failureDetectionThreshold' configuration property) " +
- "[failureDetectionThreshold=" + spi.failureDetectionThreshold() + ']');
-
- else if (!spi.failureDetectionThresholdEnabled() && e.hasCause(SocketTimeoutException.class))
+ if (e.hasCause(SocketTimeoutException.class))
LT.warn(log, null, "Socket operation timed out on handshake " +
"(consider increasing 'networkTimeout' configuration property) " +
- "[netTimeout=" + spi.getNetworkTimeout() + ']');
+ "[netTimeout=" + spi.netTimeout + ']');
else if (e.hasCause(ClassNotFoundException.class))
LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 035ac1a..87848d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -239,11 +239,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
private long ackTimeout; // Must be initialized in the constructor of child class.
/** Network timeout. */
- private long netTimeout = DFLT_NETWORK_TIMEOUT;
+ protected long netTimeout = DFLT_NETWORK_TIMEOUT;
/** Join timeout. */
@SuppressWarnings("RedundantFieldInitialization")
- private long joinTimeout = DFLT_JOIN_TIMEOUT;
+ protected long joinTimeout = DFLT_JOIN_TIMEOUT;
/** Thread priority for all threads started by SPI. */
protected int threadPri = DFLT_THREAD_PRI;
@@ -1666,7 +1666,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
if (!failureDetectionThresholdEnabled()) {
- assertParameter(netTimeout > 0, "networkTimeout > 0");
assertParameter(sockTimeout > 0, "sockTimeout > 0");
assertParameter(ackTimeout > 0, "ackTimeout > 0");
assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
@@ -1675,6 +1674,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
else
assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq");
+ assertParameter(netTimeout > 0, "networkTimeout > 0");
assertParameter(ipFinder != null, "ipFinder != null");
assertParameter(hbFreq > 0, "heartbeatFreq > 0");