You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/21 15:33:12 UTC

incubator-ignite git commit: ignite-752: network timeout must not affect failure detection threshold

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-752 a371f049b -> 9319206bd


ignite-752: network timeout must not affect failure detection threshold


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9319206b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9319206b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9319206b

Branch: refs/heads/ignite-752
Commit: 9319206bd49146c45a4256753eb2768042011d22
Parents: a371f04
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Jul 21 16:33:04 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Jul 21 16:33:04 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 56 ++++++-----------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 65 +++++++++-----------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  6 +-
 3 files changed, 50 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 00c668a..196c1b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -203,8 +203,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             msgWorker.addMessage(SPI_STOP);
 
             try {
-                if (!leaveLatch.await(spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
-                    spi.getNetworkTimeout(), MILLISECONDS))
+                if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
                     U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
             }
             catch (InterruptedException ignored) {
@@ -294,8 +293,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                                     finalFut.onDone(false);
                             }
                         }
-                    }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
-                        spi.getNetworkTimeout());
+                    }, spi.netTimeout);
 
                     sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
                 }
@@ -1076,8 +1074,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             Exception err = null;
 
-            long timeout = join ? spi.getJoinTimeout() : (spi.failureDetectionThresholdEnabled() ?
-                spi.failureDetectionThreshold() : spi.getNetworkTimeout());
+            long timeout = join ? spi.joinTimeout : spi.netTimeout;
 
             long startTime = U.currentTimeMillis();
 
@@ -1092,15 +1089,11 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (join) {
                             joinError(new IgniteSpiException("Join process timed out, connection failed and " +
                                 "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
-                                "[networkTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
                         }
                         else
-                            U.error(log, "Failed to reconnect to cluster " +
-                                (spi.failureDetectionThresholdEnabled() ?
-                                "(consider increasing 'failureDetectionThreshold' configuration  property) " +
-                                "[failureDetectionThreshold=" + spi.failureDetectionThreshold() :
-                                "(consider increasing 'networkTimeout' configuration  property) [networkTimeout=" +
-                                spi.getNetworkTimeout()) + ", sock=" + sock + ']');
+                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" +
+                                " configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
                         
                         return;
                     }
@@ -1116,8 +1109,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     try {
                         oldTimeout = sock.getSoTimeout();
 
-                        sock.setSoTimeout((int)(spi.failureDetectionThresholdEnabled() ?
-                            spi.failureDetectionThreshold() : spi.getNetworkTimeout()));
+                        sock.setSoTimeout((int)spi.netTimeout);
 
                         InputStream in = new BufferedInputStream(sock.getInputStream());
 
@@ -1127,8 +1119,6 @@ class ClientImpl extends TcpDiscoveryImpl {
                         List<TcpDiscoveryAbstractMessage> msgs = null;
 
                         while (!isInterrupted()) {
-                            // How to deal with failure detection threshold? It's bigger then networkTimeout and
-                            // we can't just decrease it on every unmarshal operation.
                             TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
 
                             if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -1174,15 +1164,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                             if (join)
                                 msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " +
-                                    "configuration  property) [joinTimeout=" + spi.getJoinTimeout() + ", err=" + e +
-                                    ']';
+                                    "configuration  property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']';
                             else
-                                msg = "Failed to reconnect to cluster " +
-                                    (spi.failureDetectionThresholdEnabled() ?
-                                    "(consider increasing 'failureDetectionThreshold' configuration  property) " +
-                                    "[failureDetectionThreshold=" + spi.failureDetectionThreshold() :
-                                    "(consider increasing 'networkTimeout' configuration  property) " +
-                                    "[networkTimeout=" + spi.getNetworkTimeout()) + ", sock=" + sock + ']';
+                                msg = "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
+                                    "configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock
+                                    + ']';
 
 
                             U.warn(log, msg);
@@ -1260,14 +1246,14 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (state == STARTING) {
                             joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
                                 "join request (consider increasing 'joinTimeout' configuration property) " +
-                                "[joinTimeout=" + spi.getJoinTimeout() + ", sock=" + currSock + ']'));
+                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
 
                             break;
                         }
                         else if (state == DISCONNECTED) {
                             if (log.isDebugEnabled())
                                 log.debug("Failed to reconnect, local node segmented " +
-                                    "[joinTimeout=" + spi.getJoinTimeout() + ']');
+                                    "[joinTimeout=" + spi.joinTimeout + ']');
 
                             state = SEGMENTED;
 
@@ -1326,10 +1312,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                             if (state != SEGMENTED && state != STOPPED) {
                                 if (log.isDebugEnabled()) {
                                     log.debug("Failed to restore closed connection, reconnect disabled, " +
-                                        (spi.failureDetectionThresholdEnabled() ?
-                                        "local node segmented [failureDetectionThreshold=" +
-                                        spi.failureDetectionThreshold() + ']' :
-                                        "local node segmented [networkTimeout=" + spi.getNetworkTimeout() + ']'));
+                                        "local node segmented [networkTimeout=" + spi.netTimeout + ']');
                                 }
 
                                 state = SEGMENTED;
@@ -1341,10 +1324,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                             if (state == STARTING || state == CONNECTED) {
                                 if (log.isDebugEnabled()) {
                                     log.debug("Failed to restore closed connection, will try to reconnect " +
-                                        (spi.failureDetectionThresholdEnabled() ?
-                                        "[failureDetectionThreshold=" + spi.failureDetectionThreshold() :
-                                        "[networkTimeout=" + spi.getNetworkTimeout()) +
-                                        ", joinTimeout=" + spi.getJoinTimeout() + ']');
+                                        "[networkTimeout=" + spi.netTimeout + ", joinTimeout=" + spi.joinTimeout + ']');
                                 }
 
                                 state = DISCONNECTED;
@@ -1433,7 +1413,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             joinCnt++;
 
-            T2<Socket, Boolean> joinRes = joinTopology(false, spi.getJoinTimeout());
+            T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
 
             if (joinRes == null) {
                 if (join)
@@ -1451,7 +1431,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             sockWriter.setSocket(joinRes.get1(), joinRes.get2());
 
-            if (spi.getJoinTimeout() > 0) {
+            if (spi.joinTimeout > 0) {
                 final int joinCnt0 = joinCnt;
 
                 timer.schedule(new TimerTask() {
@@ -1459,7 +1439,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (joinCnt == joinCnt0 && joining())
                             queue.add(JOIN_TIMEOUT);
                     }
-                }, spi.getJoinTimeout());
+                }, spi.joinTimeout);
             }
 
             sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b475768..02e13ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -278,8 +278,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
 
             synchronized (mux) {
-                long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
-                    spi.getNetworkTimeout();
+                long timeout = spi.netTimeout;
 
                 long threshold = U.currentTimeMillis() + timeout;
 
@@ -535,7 +534,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                         TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
-                            spi.getNetworkTimeout()));
+                            spi.getAckTimeout()));
 
                         if (locNodeId.equals(res.creatorNodeId())) {
                             if (log.isDebugEnabled())
@@ -701,8 +700,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Join request message has been sent (waiting for coordinator response).");
 
             synchronized (mux) {
-                long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
-                    spi.getNetworkTimeout();
+                long timeout = spi.netTimeout;
 
                 long threshold = U.currentTimeMillis() + timeout;
 
@@ -746,9 +744,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
                         "Check remote nodes logs for possible error messages. " +
                         "Note that large topology may require significant time to start. " +
-                        "Increase 'IgniteConfiguration.failureDetectionThreshold' configuration property " +
-                        "if getting this message on the starting nodes [failureDetectionThreshold=" +
-                        spi.failureDetectionThreshold() + ']');
+                        "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
+                        "if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']');
             }
         }
 
@@ -868,10 +865,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                         "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
                         addrs);
 
-                if (spi.getJoinTimeout() > 0) {
+                if (spi.joinTimeout > 0) {
                     if (noResStart == 0)
                         noResStart = U.currentTimeMillis();
-                    else if (U.currentTimeMillis() - noResStart > spi.getJoinTimeout())
+                    else if (U.currentTimeMillis() - noResStart > spi.joinTimeout)
                         throw new IgniteSpiException(
                             "Failed to connect to any address from IP finder within join timeout " +
                                 "(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
@@ -2263,7 +2260,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 // Handshake.
                                 writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
-                                    timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout()));
+                                    timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
                                     timeoutCtrl.nextTimeoutChunk(ackTimeout0));
@@ -3829,27 +3826,29 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     if (log.isDebugEnabled())
                                         log.debug("Responded to status check message " +
-                                            "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']');
-                                }
-                                catch (IgniteSpiException e) {
+                                                      "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status
+                                            () + ']');
+                                } catch (IgniteSpiException e) {
                                     if (e.hasCause(SocketException.class)) {
                                         if (log.isDebugEnabled())
                                             log.debug("Failed to respond to status check message (connection " +
-                                                "refused) [recipient=" + msg.creatorNodeId() + ", status=" +
-                                                msg.status() + ']');
+                                                          "refused) [recipient=" + msg.creatorNodeId() + ", status=" +
+                                                          msg.status() + ']');
 
                                         onException("Failed to respond to status check message (connection refused) " +
-                                            "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
-                                    }
-                                    else {
+                                                        "[recipient=" + msg.creatorNodeId() + ", status=" + msg
+                                                        .status() + ']',
+                                                    e);
+                                    } else {
                                         if (pingNode(msg.creatorNode()))
                                             // Node exists and accepts incoming connections.
                                             U.error(log, "Failed to respond to status check message [recipient=" +
                                                 msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
                                         else if (log.isDebugEnabled())
                                             log.debug("Failed to respond to status check message (did the node " +
-                                                "stop?) [recipient=" + msg.creatorNodeId() + ", status=" + msg.status()
-                                                + ']');
+                                                          "stop?) [recipient=" + msg.creatorNodeId() + ", status=" +
+                                                          msg.status()
+                                                          + ']');
                                     }
                                 }
                             }
@@ -4407,8 +4406,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 InputStream in;
 
-                IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
-
                 try {
                     // Set socket options.
                     sock.setKeepAlive(true);
@@ -4416,7 +4413,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     int timeout = sock.getSoTimeout();
 
-                    sock.setSoTimeout((int)timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout()));
+                    sock.setSoTimeout((int)spi.netTimeout);
 
                     for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
                         connLsnr.apply(sock);
@@ -4461,8 +4458,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Restore timeout.
                     sock.setSoTimeout(timeout);
 
-                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, timeoutCtrl.nextTimeoutChunk(
-                        spi.getNetworkTimeout()));
+                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
 
                     // Ping.
                     if (msg instanceof TcpDiscoveryPingRequest) {
@@ -4471,6 +4467,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
 
+                            IgniteSpiOperationTimeoutController timeoutCtrl =
+                                new IgniteSpiOperationTimeoutController(spi);
+
                             if (req.clientNodeId() != null) {
                                 ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
 
@@ -4486,8 +4485,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                         return;
                     }
 
-
-
                     // Handshake.
                     TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
 
@@ -4501,7 +4498,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (req.client())
                         res.clientAck(true);
 
-                    spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+                    spi.writeToSocket(sock, res, spi.failureDetectionThresholdEnabled() ?
+                        spi.failureDetectionThreshold() : spi.getSocketTimeout());
 
                     // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
                     // the local node sends a handshake request message on the loopback address, so we get here.
@@ -4591,15 +4589,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 
-                    if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
-                        LT.warn(log, null, "Socket operations timed out " +
-                            "(consider increasing 'failureDetectionThreshold' configuration property) " +
-                            "[failureDetectionThreshold=" + spi.failureDetectionThreshold() + ']');
-
-                    else if (!spi.failureDetectionThresholdEnabled() && e.hasCause(SocketTimeoutException.class))
+                    if (e.hasCause(SocketTimeoutException.class))
                         LT.warn(log, null, "Socket operation timed out on handshake " +
                             "(consider increasing 'networkTimeout' configuration property) " +
-                            "[netTimeout=" + spi.getNetworkTimeout() + ']');
+                            "[netTimeout=" + spi.netTimeout + ']');
 
                     else if (e.hasCause(ClassNotFoundException.class))
                         LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9319206b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 035ac1a..87848d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -239,11 +239,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     private long ackTimeout; // Must be initialized in the constructor of child class.
 
     /** Network timeout. */
-    private long netTimeout = DFLT_NETWORK_TIMEOUT;
+    protected long netTimeout = DFLT_NETWORK_TIMEOUT;
 
     /** Join timeout. */
     @SuppressWarnings("RedundantFieldInitialization")
-    private long joinTimeout = DFLT_JOIN_TIMEOUT;
+    protected long joinTimeout = DFLT_JOIN_TIMEOUT;
 
     /** Thread priority for all threads started by SPI. */
     protected int threadPri = DFLT_THREAD_PRI;
@@ -1666,7 +1666,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         }
 
         if (!failureDetectionThresholdEnabled()) {
-            assertParameter(netTimeout > 0, "networkTimeout > 0");
             assertParameter(sockTimeout > 0, "sockTimeout > 0");
             assertParameter(ackTimeout > 0, "ackTimeout > 0");
             assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
@@ -1675,6 +1674,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         else
             assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq");
 
+        assertParameter(netTimeout > 0, "networkTimeout > 0");
         assertParameter(ipFinder != null, "ipFinder != null");
         assertParameter(hbFreq > 0, "heartbeatFreq > 0");