You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 15:22:57 UTC
[1/4] incubator-ignite git commit: ignite-752: supported failure
detection threshold in ServerImpl
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-752 1b6005a50 -> 5fc818c05
ignite-752: supported failure detection threshold in ServerImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e37d32a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e37d32a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e37d32a
Branch: refs/heads/ignite-752
Commit: 8e37d32a388b5a6b6a1cd7cacbcd70f8db9dc2c2
Parents: 1b6005a
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 10:53:36 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 10:53:36 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 129 +++++++++++++------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +-
2 files changed, 92 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e37d32a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d506507..fca3104 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -474,6 +474,8 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
if (F.contains(spi.locNodeAddrs, addr)) {
if (clientNodeId == null)
return F.t(getLocalNodeId(), false);
@@ -486,7 +488,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean clientPingRes;
try {
- clientPingRes = clientWorker.ping();
+ clientPingRes = clientWorker.ping(timeoutCtrl);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -509,8 +511,6 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
Socket sock = null;
- IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
-
int reconCnt = 0;
while (true) {
@@ -899,6 +899,8 @@ class ServerImpl extends TcpDiscoveryImpl {
IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+ int reconCnt = 0;
+
while (true){
// Need to set to false on each new iteration,
// since remote node may leave in the middle of the first iteration.
@@ -980,6 +982,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
break;
+ if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+ break;
+
if (!openSock) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2) {
@@ -2041,10 +2046,14 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
+ int reconCnt = 0;
+
+ IgniteSpiOperationTimeoutController timeoutCtrl = null;
+
while (true) {
if (sock == null) {
- IgniteSpiOperationTimeoutController timeoutCrt =
- new IgniteSpiOperationTimeoutController(spi);
+ if (timeoutCtrl == null)
+ timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
nextNodeExists = false;
@@ -2056,16 +2065,16 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr, timeoutCrt);
+ sock = spi.openSocket(addr, timeoutCtrl);
openSock = true;
// Handshake.
writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
- timeoutCrt.nextTimeoutChunk(spi.getNetworkTimeout()));
+ timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
- timeoutCrt.nextTimeoutChunk(ackTimeout0));
+ timeoutCtrl.nextTimeoutChunk(ackTimeout0));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -2149,7 +2158,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!openSock)
break; // Don't retry if we can not establish connection.
- if (timeoutCrt.checkFailureDetectionThresholdReached(e))
+ if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+ break;
+
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
break;
else if (!spi.failureDetectionThresholdEnabled() && (e instanceof
SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
@@ -2167,9 +2179,13 @@ class ServerImpl extends TcpDiscoveryImpl {
sock = null;
}
- else
+ else {
// Next node exists and accepts incoming messages.
nextNodeExists = true;
+ // Resetting timeout control object to let the code below to use a new one
+ // for the next bunch of operations.
+ timeoutCtrl = null;
+ }
}
}
@@ -2182,8 +2198,6 @@ class ServerImpl extends TcpDiscoveryImpl {
assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
- IgniteSpiOperationTimeoutController timeoutCtrl;
-
if (failure || forceSndPending) {
if (log.isDebugEnabled())
log.debug("Pending messages will be sent [failure=" + failure +
@@ -2196,8 +2210,6 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean skip = pendingMsgs.discardId != null;
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
- timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
-
if (skip) {
if (pendingMsg.id().equals(pendingMsgs.discardId))
skip = false;
@@ -2210,6 +2222,9 @@ class ServerImpl extends TcpDiscoveryImpl {
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
pendingMsgs.discardId);
+ if (timeoutCtrl == null)
+ timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
try {
writeToSocket(sock, pendingMsg, timeoutCtrl.nextTimeoutChunk(
spi.getSocketTimeout()));
@@ -2231,20 +2246,26 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog("Pending message has been sent to next node [msg=" + msg.id() +
", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
", res=" + res + ']');
+
+ // Resetting timeout control object to create a new one for the next bunch of
+ // operations.
+ timeoutCtrl = null;
}
}
prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
- timeoutCtrl
try {
long tstamp = U.currentTimeMillis();
- writeToSocket(sock, msg);
+ if (timeoutCtrl == null)
+ timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
+ writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
- int res = spi.readReceipt(sock, ackTimeout0);
+ int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Message has been sent to next node [msg=" + msg +
@@ -2279,11 +2300,19 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
e);
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
- ackTimeout0 *= 2;
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ break;
- if (!checkAckTimeout(ackTimeout0))
+ if (!spi.failureDetectionThresholdEnabled()) {
+ if (++reconCnt == spi.getReconnectCount())
break;
+ else if (e instanceof SocketTimeoutException ||
+ X.hasCause(e, SocketTimeoutException.class)) {
+ ackTimeout0 *= 2;
+
+ if (!checkAckTimeout(ackTimeout0))
+ break;
+ }
}
}
finally {
@@ -2296,7 +2325,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
- ", i=" + i + ']');
+ (!spi.failureDetectionThresholdEnabled() ? ", i=" + reconCnt : "") + ']');
}
}
} // Try to reconnect.
@@ -3360,7 +3389,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (leftNode.equals(next) && sock != null) {
try {
- writeToSocket(sock, msg);
+ writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ?
+ spi.failureDetectionThreshold() : spi.getSocketTimeout());
if (log.isDebugEnabled())
log.debug("Sent verified node left message to leaving node: " + msg);
@@ -4119,6 +4149,8 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
InputStream in;
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
try {
// Set socket options.
sock.setKeepAlive(true);
@@ -4126,7 +4158,7 @@ class ServerImpl extends TcpDiscoveryImpl {
int timeout = sock.getSoTimeout();
- sock.setSoTimeout((int)spi.netTimeout);
+ sock.setSoTimeout((int)timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout()));
for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
connLsnr.apply(sock);
@@ -4171,7 +4203,8 @@ class ServerImpl extends TcpDiscoveryImpl {
// Restore timeout.
sock.setSoTimeout(timeout);
- TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
+ TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, timeoutCtrl.nextTimeoutChunk(
+ spi.getNetworkTimeout()));
// Ping.
if (msg instanceof TcpDiscoveryPingRequest) {
@@ -4184,10 +4217,10 @@ class ServerImpl extends TcpDiscoveryImpl {
ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
if (clientWorker != null)
- res.clientExists(clientWorker.ping());
+ res.clientExists(clientWorker.ping(timeoutCtrl));
}
- spi.writeToSocket(sock, res);
+ spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
}
else if (log.isDebugEnabled())
log.debug("Ignore ping request, node is stopping.");
@@ -4205,7 +4238,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryHandshakeResponse res =
new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
- spi.writeToSocket(sock, res);
+ spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
// the local node sends a handshake request message on the loopback address, so we get here.
@@ -4295,10 +4328,15 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
- if (e.hasCause(SocketTimeoutException.class))
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ LT.warn(log, null, "Socket operations timed out " +
+ "(consider increasing 'failureDetectionThreshold' configuration property) " +
+ "[failureDetectionThreshold=" + spi.failureDetectionThreshold() + ']');
+
+ else if (!spi.failureDetectionThresholdEnabled() && e.hasCause(SocketTimeoutException.class))
LT.warn(log, null, "Socket operation timed out on handshake " +
"(consider increasing 'networkTimeout' configuration property) " +
- "[netTimeout=" + spi.netTimeout + ']');
+ "[netTimeout=" + spi.getNetworkTimeout() + ']');
else if (e.hasCause(ClassNotFoundException.class))
LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
@@ -4314,6 +4352,9 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
+ long socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getSocketTimeout();
+
while (!isInterrupted()) {
try {
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
@@ -4346,7 +4387,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
clientMsgWrk.start();
@@ -4356,7 +4397,7 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
else {
- spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN);
+ spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout);
break;
}
@@ -4364,7 +4405,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
// Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
boolean ignored = false;
@@ -4393,7 +4434,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
// Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
boolean ignored = false;
@@ -4422,7 +4463,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
// Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
boolean ignored = false;
@@ -4451,7 +4492,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
// Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
boolean ignored = false;
@@ -4493,7 +4534,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Send receipt back.
if (clientMsgWrk == null)
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -4594,8 +4635,11 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
+ long socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getSocketTimeout();
+
if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -4629,7 +4673,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Local node is stopping. Remote node should try next one.
res = RES_CONTINUE_JOIN;
- spi.writeToSocket(msg, sock, res);
+ spi.writeToSocket(msg, sock, res, socketTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
@@ -4751,7 +4795,8 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
prepareNodeAddedMessage(msg, clientNodeId, null, null);
- writeToSocket(sock, msg);
+ writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getSocketTimeout());
}
finally {
clearNodeAddedMessage(msg);
@@ -4784,10 +4829,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * @param timeoutCtrl Timeout controller.
* @return Ping result.
* @throws InterruptedException If interrupted.
*/
- public boolean ping() throws InterruptedException {
+ public boolean ping(IgniteSpiOperationTimeoutController timeoutCtrl) throws InterruptedException {
if (spi.isNodeStopping0())
return false;
@@ -4813,7 +4859,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
try {
- return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS);
+ return fut.get(timeoutCtrl.nextTimeoutChunk(spi.getAckTimeout()),
+ TimeUnit.MILLISECONDS);
}
catch (IgniteInterruptedCheckedException ignored) {
throw new InterruptedException();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e37d32a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 126bf03..5ed946b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1248,13 +1248,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* @param msg Received message.
* @param sock Socket.
* @param res Integer response.
+ * @param timeout Socket timeout.
* @throws IOException If IO failed or write timed out.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+ protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+ throws IOException {
assert sock != null;
- SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
[2/4] incubator-ignite git commit: ignite-752: support of failure
detection threshold in ClientImpl
Posted by sb...@apache.org.
ignite-752: support of failure detection threshold in ClientImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c3c0ef87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c3c0ef87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c3c0ef87
Branch: refs/heads/ignite-752
Commit: c3c0ef87be79d71aa922d9a11e2d205df19a7829
Parents: 8e37d32
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 11:52:57 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 11:52:57 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 83 ++++++++++++++------
.../tcp/TcpClientDiscoverySpiSelfTest.java | 14 ++--
2 files changed, 69 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 3f05f59..62b2e35 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -194,7 +194,8 @@ class ClientImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(SPI_STOP);
try {
- if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
+ if (!leaveLatch.await(spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getNetworkTimeout(), MILLISECONDS))
U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
}
catch (InterruptedException ignored) {
@@ -272,7 +273,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (pingFuts.remove(nodeId, finalFut))
finalFut.onDone(false);
}
- }, spi.netTimeout);
+ }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : spi.getNetworkTimeout());
sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
}
@@ -456,13 +457,17 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<Throwable> errs = null;
- long ackTimeout0 = spi.ackTimeout;
+ long ackTimeout0 = spi.getAckTimeout();
+
+ int reconCnt = 0;
int connectAttempts = 1;
UUID locNodeId = getLocalNodeId();
- for (int i = 0; i < spi.reconCnt; i++) {
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
+ while (true) {
boolean openSock = false;
Socket sock = null;
@@ -470,7 +475,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr);
+ sock = spi.openSocket(addr, timeoutCtrl);
openSock = true;
@@ -478,7 +483,7 @@ class ClientImpl extends TcpDiscoveryImpl {
req.client(true);
- spi.writeToSocket(sock, req);
+ spi.writeToSocket(sock, req, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
@@ -499,7 +504,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msg.client(true);
- spi.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -522,6 +527,12 @@ class ClientImpl extends TcpDiscoveryImpl {
errs.add(e);
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ break;
+
+ if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+ break;
+
if (!openSock) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2) {
@@ -533,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl {
break; // Don't retry if we can not establish connection.
}
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+ if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException ||
+ X.hasCause(e, SocketTimeoutException.class))) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
@@ -825,11 +837,17 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
+ /** */
+ private final long socketTimeout;
+
/**
*
*/
protected SocketWriter() {
super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
+
+ socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getSocketTimeout();
}
/**
@@ -893,7 +911,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msgLsnr.apply(msg);
try {
- spi.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg, socketTimeout);
msg = null;
}
@@ -954,7 +972,8 @@ class ClientImpl extends TcpDiscoveryImpl {
Exception err = null;
- long timeout = join ? spi.joinTimeout : spi.netTimeout;
+ long timeout = join ? spi.getJoinTimeout() : (spi.failureDetectionThresholdEnabled() ?
+ spi.failureDetectionThreshold() : spi.getNetworkTimeout());
long startTime = U.currentTimeMillis();
@@ -966,11 +985,17 @@ class ClientImpl extends TcpDiscoveryImpl {
if (join) {
joinError(new IgniteSpiException("Join process timed out, connection failed and " +
"failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
- "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ "[networkTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
}
else
- U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
- "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+ U.error(log, "Failed to reconnect to cluster " +
+ (spi.failureDetectionThresholdEnabled() ?
+ "(consider increasing 'failureDetectionThreshold' configuration property) " +
+ "[failureDetectionThreshold=" + spi.failureDetectionThreshold()
+ :
+ "(consider increasing 'networkTimeout' configuration property) [networkTimeout="
+ + spi.getNetworkTimeout())
+ + ", sock=" + sock + ']');
return;
}
@@ -983,7 +1008,8 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
oldTimeout = sock.getSoTimeout();
- sock.setSoTimeout((int)spi.netTimeout);
+ sock.setSoTimeout((int)(spi.failureDetectionThresholdEnabled() ?
+ spi.failureDetectionThreshold() : spi.getNetworkTimeout()));
InputStream in = new BufferedInputStream(sock.getInputStream());
@@ -993,6 +1019,8 @@ class ClientImpl extends TcpDiscoveryImpl {
List<TcpDiscoveryAbstractMessage> msgs = null;
while (!isInterrupted()) {
+ // How to deal with failure detection threshold? It's bigger then networkTimeout and
+ // we can't just decrease it on every unmarshal operation.
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -1028,10 +1056,21 @@ class ClientImpl extends TcpDiscoveryImpl {
log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e);
if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) {
- String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " +
- "configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' :
- "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
- "configuration property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']';
+ String msg;
+
+ if (join)
+ msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " +
+ "configuration property) [joinTimeout=" + spi.getJoinTimeout() + ", err=" + e +
+ ']';
+ else
+ msg = "Failed to reconnect to cluster " +
+ (spi.failureDetectionThresholdEnabled() ?
+ "(consider increasing 'failureDetectionThreshold' configuration property) " +
+ "[failureDetectionThreshold=" + spi.failureDetectionThreshold()
+ :
+ "(consider increasing 'networkTimeout' configuration property) " +
+ "[networkTimeout=" + spi.getNetworkTimeout()) + ", sock=" + sock + ']';
+
U.warn(log, msg);
@@ -1094,7 +1133,7 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onJoinStarted();
try {
- final Socket sock = joinTopology(false, spi.joinTimeout);
+ final Socket sock = joinTopology(false, spi.getJoinTimeout());
if (sock == null) {
joinError(new IgniteSpiException("Join process timed out."));
@@ -1106,13 +1145,13 @@ class ClientImpl extends TcpDiscoveryImpl {
sockWriter.setSocket(sock);
- if (spi.joinTimeout > 0) {
+ if (spi.getJoinTimeout() > 0) {
timer.schedule(new TimerTask() {
@Override public void run() {
if (joinLatch.getCount() > 0)
queue.add(JOIN_TIMEOUT);
}
- }, spi.joinTimeout);
+ }, spi.getJoinTimeout());
}
sockReader.setSocket(sock, locNode.clientRouterNodeId());
@@ -1124,7 +1163,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (joinLatch.getCount() > 0) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ "[joinTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ec6a526..c8f3942 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1538,7 +1538,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
- GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
waitFor(writeLock);
boolean fail = false;
@@ -1556,17 +1556,18 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
sock.close();
}
- super.writeToSocket(sock, msg, bout);
+ super.writeToSocket(sock, msg, bout, timeout);
if (afterWrite != null)
afterWrite.apply(msg, sock);
}
/** {@inheritDoc} */
- @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ @Override protected Socket openSocket(InetSocketAddress sockAddr,
+ IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException {
waitFor(openSockLock);
- return super.openSocket(sockAddr);
+ return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutController(this));
}
/**
@@ -1595,7 +1596,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+ throws IOException {
if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) {
TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
@@ -1613,7 +1615,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
}
- super.writeToSocket(msg, sock, res);
+ super.writeToSocket(msg, sock, res, timeout);
}
/** {@inheritDoc} */
[4/4] incubator-ignite git commit: ignite-752: added tests
Posted by sb...@apache.org.
ignite-752: added tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5fc818c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5fc818c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5fc818c0
Branch: refs/heads/ignite-752
Commit: 5fc818c05190f50dd6bfc753f54e74f422426c66
Parents: 0949c93
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 16:22:45 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 16:22:45 2015 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 8 +-
.../IgniteSpiOperationTimeoutController.java | 2 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 18 +-
...TcpDiscoverySpiFailureThresholdSelfTest.java | 215 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 1 +
5 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index b3d2bfc..49b53c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -190,7 +190,7 @@ public class IgniteConfiguration {
public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
/** Default failure detection threshold used by DiscoverySpi and CommunicationSpi in millis. */
- public static final int DFLT_FAILURE_DETECTION_THRESHOLD = 10_000;
+ public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 10_000;
/** Optional grid name. */
private String gridName;
@@ -370,7 +370,7 @@ public class IgniteConfiguration {
private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE;
/** Failure detection threshold used by DiscoverySpi and CommunicationSpi. */
- private int failureDetectionThreshold = DFLT_FAILURE_DETECTION_THRESHOLD;
+ private long failureDetectionThreshold = DFLT_FAILURE_DETECTION_THRESHOLD;
/** Property names to include into node attributes. */
private String[] includeProps;
@@ -1639,7 +1639,7 @@ public class IgniteConfiguration {
* TODO: IGNITE-752
* @return
*/
- public int getFailureDetectionThreshold() {
+ public long getFailureDetectionThreshold() {
return failureDetectionThreshold;
}
@@ -1647,7 +1647,7 @@ public class IgniteConfiguration {
* TODO: IGNITE-752
* @param failureDetectionThreshold
*/
- public void setFailureDetectionThreshold(int failureDetectionThreshold) {
+ public void setFailureDetectionThreshold(long failureDetectionThreshold) {
this.failureDetectionThreshold = failureDetectionThreshold;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
index ba95871..84ffd0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
@@ -48,7 +48,7 @@ public class IgniteSpiOperationTimeoutController {
failureDetectionThreshold = adapter.failureDetectionThreshold();
assert !failureDetectionThresholdEnabled || failureDetectionThreshold > 0 : " [failureDetectionThreshold=" +
- failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled + ']';
+ failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 5be7ab9..eb131b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1562,27 +1562,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
@Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
initFailureDetectionThreshold();
- if (!failureDetectionThresholdEnabled()) {
- if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
+ if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
+ if (!failureDetectionThresholdEnabled()) {
if (ackTimeout == 0)
ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
if (sockTimeout == 0)
sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT;
+ }
- impl = new ClientImpl(this);
+ impl = new ClientImpl(this);
- ctxInitLatch.countDown();
- } else {
+ ctxInitLatch.countDown();
+ } else {
+ if (!failureDetectionThresholdEnabled()) {
if (ackTimeout == 0)
ackTimeout = DFLT_ACK_TIMEOUT;
if (sockTimeout == 0)
sockTimeout = DFLT_SOCK_TIMEOUT;
-
- impl = new ServerImpl(this);
}
+ impl = new ServerImpl(this);
+ }
+
+ if (!failureDetectionThresholdEnabled()) {
assertParameter(netTimeout > 0, "networkTimeout > 0");
assertParameter(sockTimeout > 0, "sockTimeout > 0");
assertParameter(ackTimeout > 0, "ackTimeout > 0");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
new file mode 100644
index 0000000..db0d9c5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ *
+ */
+public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest {
+ /** */
+ private static TestTcpDiscoverySpi firstSpi;
+
+ /** */
+ private static TestTcpDiscoverySpi secondSpi;
+
+ /** */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected DiscoverySpi getSpi(int idx) {
+ TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
+
+ if (idx == 0)
+ firstSpi = spi;
+ else
+ secondSpi = spi;
+
+ spi.setMetricsProvider(createMetricsProvider());
+ spi.setIpFinder(ipFinder);
+
+ return spi;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testFailureDetectionThresholdEnabled() throws Exception {
+ assertTrue(firstSpi.failureDetectionThresholdEnabled());
+ assertTrue(secondSpi.failureDetectionThresholdEnabled());
+
+ assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi.failureDetectionThreshold());
+ assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi.failureDetectionThreshold());
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testFailureDetectionOnSocketOpen() throws Exception {
+ try {
+ ClusterNode node = secondSpi.getLocalNode();
+
+ firstSpi.openSocketTimeout = true;
+
+ assertFalse(firstSpi.pingNode(node.id()));
+ assertTrue(firstSpi.validTimeout);
+ assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeout"));
+
+ firstSpi.openSocketTimeout = false;
+ firstSpi.openSocketTimeoutWait = true;
+
+ assertFalse(firstSpi.pingNode(node.id()));
+ assertTrue(firstSpi.validTimeout);
+ assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeoutWait"));
+ }
+ finally {
+ firstSpi.resetState();
+ }
+ }
+
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testFailureDetectionOnSocketWrite() throws Exception {
+ try {
+ ClusterNode node = secondSpi.getLocalNode();
+
+ firstSpi.writeToSocketTimeoutWait = true;
+
+ assertFalse(firstSpi.pingNode(node.id()));
+ assertTrue(firstSpi.validTimeout);
+
+ firstSpi.writeToSocketTimeoutWait = false;
+
+ assertTrue(firstSpi.pingNode(node.id()));
+ assertTrue(firstSpi.validTimeout);
+ }
+ finally {
+ firstSpi.resetState();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private boolean openSocketTimeout;
+
+ /** */
+ private boolean openSocketTimeoutWait;
+
+ /** */
+ private boolean writeToSocketTimeoutWait;
+
+ /** */
+ private boolean validTimeout = true;
+
+ /** */
+ private IgniteSpiOperationTimeoutException err;
+
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(InetSocketAddress sockAddr,
+ IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException {
+
+ if (openSocketTimeout) {
+ err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout");
+ throw err;
+ }
+ else if (openSocketTimeoutWait) {
+ long timeout = timeoutCtrl.nextTimeoutChunk(0);
+
+ try {
+ Thread.sleep(timeout);
+ }
+ catch (InterruptedException e) {
+ // Ignore
+ }
+
+ try {
+ timeoutCtrl.nextTimeoutChunk(0);
+ }
+ catch (IgniteSpiOperationTimeoutException e) {
+ err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait");
+ throw err;
+ }
+ }
+
+ Socket sock = super.openSocket(sockAddr, timeoutCtrl);
+
+ try {
+ Thread.sleep(1500);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+
+ return sock;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
+ throws IOException, IgniteCheckedException {
+ if (!(msg instanceof TcpDiscoveryPingRequest)) {
+ super.writeToSocket(sock, msg, timeout);
+ return;
+ }
+
+ if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD) {
+ validTimeout = false;
+
+ throw new IgniteCheckedException("Invalid timeout: " + timeout);
+ }
+
+ if (writeToSocketTimeoutWait) {
+ try {
+ Thread.sleep(timeout);
+ }
+ catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ else
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /**
+ *
+ */
+ private void resetState() {
+ openSocketTimeout = false;
+ openSocketTimeoutWait = false;
+ writeToSocketTimeoutWait = false;
+ err = null;
+ validTimeout = true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 6f59f14..357fd93 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -44,6 +44,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpDiscoverySelfTest.class));
suite.addTest(new TestSuite(TcpDiscoverySpiSelfTest.class));
+ suite.addTest(new TestSuite(TcpDiscoverySpiFailureThresholdSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoverySpiStartStopSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoverySpiConfigSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoveryMarshallerCheckSelfTest.class));
[3/4] incubator-ignite git commit: ignite-752: failure detection
threshold support in TcpCommunicationSpi
Posted by sb...@apache.org.
ignite-752: failure detection threshold support in TcpCommunicationSpi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0949c93f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0949c93f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0949c93f
Branch: refs/heads/ignite-752
Commit: 0949c93fe1d1dd435b906fb1d79263e9073c0f1b
Parents: c3c0ef8
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 13:25:48 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 13:25:48 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 20 ++--
.../IgniteSpiOperationTimeoutController.java | 7 +-
.../communication/tcp/TcpCommunicationSpi.java | 106 +++++++++++++++----
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
4 files changed, 102 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 422ce81..500c461 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -203,15 +203,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
}
- /** {@inheritDoc} */
- @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
- if (!failureDetectionThresholdEnabled) {
- failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
-
- assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
- }
- }
-
/**
* Inject ignite instance.
*/
@@ -579,6 +570,17 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/**
* TODO: IGNITE-752
+ */
+ protected void initFailureDetectionThreshold() {
+ if (failureDetectionThresholdEnabled) {
+ failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
+
+ assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
+ }
+ }
+
+ /**
+ * TODO: IGNITE-752
* @param enabled
*/
public void failureDetectionThresholdEnabled(boolean enabled) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
index 3ae4fa4..ba95871 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
@@ -46,6 +46,9 @@ public class IgniteSpiOperationTimeoutController {
public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) {
failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
failureDetectionThreshold = adapter.failureDetectionThreshold();
+
+ assert !failureDetectionThresholdEnabled || failureDetectionThreshold > 0 : " [failureDetectionThreshold=" +
+ failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled + ']';
}
/**
@@ -70,8 +73,8 @@ public class IgniteSpiOperationTimeoutController {
lastOperStartTs = curTs;
if (timeout <= 0)
- throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase failure detection" +
- " threshold using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts" +
+ throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+ "'failureDetectionThreshold' configuration property or set SPI specific timeouts" +
" manually. Current failure detection threshold: " + failureDetectionThreshold);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ca2995..b2bc9eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -970,6 +970,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@IgniteSpiConfiguration(optional = true)
public void setConnectTimeout(long connTimeout) {
this.connTimeout = connTimeout;
+
+ failureDetectionThresholdEnabled(false);
}
/** {@inheritDoc} */
@@ -992,6 +994,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@IgniteSpiConfiguration(optional = true)
public void setMaxConnectTimeout(long maxConnTimeout) {
this.maxConnTimeout = maxConnTimeout;
+
+ failureDetectionThresholdEnabled(false);
}
/** {@inheritDoc} */
@@ -1010,6 +1014,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@IgniteSpiConfiguration(optional = true)
public void setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
+
+ failureDetectionThresholdEnabled(false);
}
/** {@inheritDoc} */
@@ -1239,6 +1245,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
nodeIdMsg = new NodeIdMessage(getLocalNodeId());
+ initFailureDetectionThreshold();
+
assertParameter(locPort > 1023, "locPort > 1023");
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1247,10 +1255,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
- assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
- assertParameter(connTimeout >= 0, "connTimeout >= 0");
- assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+
+ if (!failureDetectionThresholdEnabled()) {
+ assertParameter(reconCnt > 0, "reconnectCnt > 0");
+ assertParameter(connTimeout >= 0, "connTimeout >= 0");
+ assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+ }
+
assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
@@ -1260,7 +1272,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
- "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
+ "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
}
try {
@@ -1326,9 +1338,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
- log.debug(configInfo("connTimeout", connTimeout));
- log.debug(configInfo("maxConnTimeout", maxConnTimeout));
- log.debug(configInfo("reconCnt", reconCnt));
+
+ if (failureDetectionThresholdEnabled()) {
+ log.debug(configInfo("connTimeout", connTimeout));
+ log.debug(configInfo("maxConnTimeout", maxConnTimeout));
+ log.debug(configInfo("reconCnt", reconCnt));
+ }
+ else
+ log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold()));
+
log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
log.debug(configInfo("ackSndThreshold", ackSndThreshold));
log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
@@ -1850,17 +1868,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long connTimeout0 = connTimeout;
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+
while (true) {
GridCommunicationClient client;
try {
client = new GridShmemCommunicationClient(metricsLsnr,
port,
- connTimeout,
+ timeoutCtrl.nextTimeoutChunk(connTimeout),
log,
getSpiContext().messageFormatter());
}
catch (IgniteCheckedException e) {
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ throw e;
+
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
connectAttempts++;
@@ -1872,15 +1895,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client, null, node.id(), connTimeout0);
+ safeHandshake(client, null, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
}
- catch (HandshakeTimeoutException e) {
+ catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+ client.forceClose();
+
+ if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
+ timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+ log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" +
+ failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']');
+
+ throw e;
+ }
+
+ assert !failureDetectionThresholdEnabled();
+
if (log.isDebugEnabled())
- log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+ log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", err=" + e.getMessage() + ", client=" + client + ']');
- client.forceClose();
-
if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
if (log.isDebugEnabled())
log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
@@ -1889,8 +1922,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
- }
- else {
+ } else {
attempt++;
connTimeout0 *= 2;
@@ -1994,6 +2026,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
int attempt = 1;
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+
while (!conn) { // Reconnection on handshake timeout.
try {
SocketChannel ch = SocketChannel.open();
@@ -2020,9 +2054,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt = -1;
try {
- ch.socket().connect(addr, (int)connTimeout);
+ ch.socket().connect(addr, (int)timeoutCtrl.nextTimeoutChunk(connTimeout));
- rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+ rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
if (rcvCnt == -1)
return null;
@@ -2056,14 +2090,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
}
- catch (HandshakeTimeoutException e) {
+ catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
if (client != null) {
client.forceClose();
client = null;
}
- onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+ if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
+ timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+
+ String msg = "Handshake timed out (failure detection threshold is reached) " +
+ "[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']';
+
+ onException(msg, e);
+
+ if (log.isDebugEnabled())
+ log.debug(msg);
+
+ if (errs == null)
+ errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+
+ errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+
+ break;
+ }
+
+ assert !failureDetectionThresholdEnabled();
+
+ onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", addr=" + addr + ']', e);
if (log.isDebugEnabled())
@@ -2108,7 +2166,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
- if (X.hasCause(e, SocketTimeoutException.class))
+ boolean failureDetThrReached = timeoutCtrl.checkFailureDetectionThresholdReached(e);
+
+ if (failureDetThrReached)
+ LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " +
+ "configuration property) [addr=" + addr + ", failureDetectionThreshold=" +
+ failureDetectionThreshold() + ']');
+ else if (X.hasCause(e, SocketTimeoutException.class))
LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
"configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
@@ -2121,7 +2185,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
// Reconnect for the second time, if connection is not established.
- if (connectAttempts < 2 &&
+ if (!failureDetThrReached && connectAttempts < 2 &&
(e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
connectAttempts++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 5ed946b..5be7ab9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1560,7 +1560,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
- super.spiStart(gridName);
+ initFailureDetectionThreshold();
if (!failureDetectionThresholdEnabled()) {
if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {