You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 15:22:58 UTC
[2/4] incubator-ignite git commit: ignite-752: support of failure
detection threshold in ClientImpl
ignite-752: support of failure detection threshold in ClientImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c3c0ef87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c3c0ef87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c3c0ef87
Branch: refs/heads/ignite-752
Commit: c3c0ef87be79d71aa922d9a11e2d205df19a7829
Parents: 8e37d32
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 11:52:57 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 11:52:57 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 83 ++++++++++++++------
.../tcp/TcpClientDiscoverySpiSelfTest.java | 14 ++--
2 files changed, 69 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 3f05f59..62b2e35 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -194,7 +194,8 @@ class ClientImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(SPI_STOP);
try {
- if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
+ if (!leaveLatch.await(spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getNetworkTimeout(), MILLISECONDS))
U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
}
catch (InterruptedException ignored) {
@@ -272,7 +273,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (pingFuts.remove(nodeId, finalFut))
finalFut.onDone(false);
}
- }, spi.netTimeout);
+ }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : spi.getNetworkTimeout());
sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
}
@@ -456,13 +457,17 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<Throwable> errs = null;
- long ackTimeout0 = spi.ackTimeout;
+ long ackTimeout0 = spi.getAckTimeout();
+
+ int reconCnt = 0;
int connectAttempts = 1;
UUID locNodeId = getLocalNodeId();
- for (int i = 0; i < spi.reconCnt; i++) {
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
+ while (true) {
boolean openSock = false;
Socket sock = null;
@@ -470,7 +475,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr);
+ sock = spi.openSocket(addr, timeoutCtrl);
openSock = true;
@@ -478,7 +483,7 @@ class ClientImpl extends TcpDiscoveryImpl {
req.client(true);
- spi.writeToSocket(sock, req);
+ spi.writeToSocket(sock, req, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
@@ -499,7 +504,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msg.client(true);
- spi.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -522,6 +527,12 @@ class ClientImpl extends TcpDiscoveryImpl {
errs.add(e);
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ break;
+
+ if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+ break;
+
if (!openSock) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2) {
@@ -533,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl {
break; // Don't retry if we can not establish connection.
}
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+ if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException ||
+ X.hasCause(e, SocketTimeoutException.class))) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
@@ -825,11 +837,17 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
+ /** */
+ private final long socketTimeout;
+
/**
*
*/
protected SocketWriter() {
super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
+
+ socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getSocketTimeout();
}
/**
@@ -893,7 +911,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msgLsnr.apply(msg);
try {
- spi.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg, socketTimeout);
msg = null;
}
@@ -954,7 +972,8 @@ class ClientImpl extends TcpDiscoveryImpl {
Exception err = null;
- long timeout = join ? spi.joinTimeout : spi.netTimeout;
+ long timeout = join ? spi.getJoinTimeout() : (spi.failureDetectionThresholdEnabled() ?
+ spi.failureDetectionThreshold() : spi.getNetworkTimeout());
long startTime = U.currentTimeMillis();
@@ -966,11 +985,17 @@ class ClientImpl extends TcpDiscoveryImpl {
if (join) {
joinError(new IgniteSpiException("Join process timed out, connection failed and " +
"failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
- "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ "[networkTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
}
else
- U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
- "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+ U.error(log, "Failed to reconnect to cluster " +
+ (spi.failureDetectionThresholdEnabled() ?
+ "(consider increasing 'failureDetectionThreshold' configuration property) " +
+ "[failureDetectionThreshold=" + spi.failureDetectionThreshold()
+ :
+ "(consider increasing 'networkTimeout' configuration property) [networkTimeout="
+ + spi.getNetworkTimeout())
+ + ", sock=" + sock + ']');
return;
}
@@ -983,7 +1008,8 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
oldTimeout = sock.getSoTimeout();
- sock.setSoTimeout((int)spi.netTimeout);
+ sock.setSoTimeout((int)(spi.failureDetectionThresholdEnabled() ?
+ spi.failureDetectionThreshold() : spi.getNetworkTimeout()));
InputStream in = new BufferedInputStream(sock.getInputStream());
@@ -993,6 +1019,8 @@ class ClientImpl extends TcpDiscoveryImpl {
List<TcpDiscoveryAbstractMessage> msgs = null;
while (!isInterrupted()) {
+ // How to deal with failure detection threshold? It's bigger then networkTimeout and
+ // we can't just decrease it on every unmarshal operation.
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -1028,10 +1056,21 @@ class ClientImpl extends TcpDiscoveryImpl {
log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e);
if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) {
- String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " +
- "configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' :
- "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
- "configuration property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']';
+ String msg;
+
+ if (join)
+ msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " +
+ "configuration property) [joinTimeout=" + spi.getJoinTimeout() + ", err=" + e +
+ ']';
+ else
+ msg = "Failed to reconnect to cluster " +
+ (spi.failureDetectionThresholdEnabled() ?
+ "(consider increasing 'failureDetectionThreshold' configuration property) " +
+ "[failureDetectionThreshold=" + spi.failureDetectionThreshold()
+ :
+ "(consider increasing 'networkTimeout' configuration property) " +
+ "[networkTimeout=" + spi.getNetworkTimeout()) + ", sock=" + sock + ']';
+
U.warn(log, msg);
@@ -1094,7 +1133,7 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onJoinStarted();
try {
- final Socket sock = joinTopology(false, spi.joinTimeout);
+ final Socket sock = joinTopology(false, spi.getJoinTimeout());
if (sock == null) {
joinError(new IgniteSpiException("Join process timed out."));
@@ -1106,13 +1145,13 @@ class ClientImpl extends TcpDiscoveryImpl {
sockWriter.setSocket(sock);
- if (spi.joinTimeout > 0) {
+ if (spi.getJoinTimeout() > 0) {
timer.schedule(new TimerTask() {
@Override public void run() {
if (joinLatch.getCount() > 0)
queue.add(JOIN_TIMEOUT);
}
- }, spi.joinTimeout);
+ }, spi.getJoinTimeout());
}
sockReader.setSocket(sock, locNode.clientRouterNodeId());
@@ -1124,7 +1163,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (joinLatch.getCount() > 0) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ "[joinTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ec6a526..c8f3942 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1538,7 +1538,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
- GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
waitFor(writeLock);
boolean fail = false;
@@ -1556,17 +1556,18 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
sock.close();
}
- super.writeToSocket(sock, msg, bout);
+ super.writeToSocket(sock, msg, bout, timeout);
if (afterWrite != null)
afterWrite.apply(msg, sock);
}
/** {@inheritDoc} */
- @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ @Override protected Socket openSocket(InetSocketAddress sockAddr,
+ IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException {
waitFor(openSockLock);
- return super.openSocket(sockAddr);
+ return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutController(this));
}
/**
@@ -1595,7 +1596,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+ throws IOException {
if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) {
TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
@@ -1613,7 +1615,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
}
- super.writeToSocket(msg, sock, res);
+ super.writeToSocket(msg, sock, res, timeout);
}
/** {@inheritDoc} */