You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 15:22:57 UTC

[1/4] incubator-ignite git commit: ignite-752: supported failure detection threshold in ServerImpl

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-752 1b6005a50 -> 5fc818c05


ignite-752: supported failure detection threshold in ServerImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e37d32a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e37d32a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e37d32a

Branch: refs/heads/ignite-752
Commit: 8e37d32a388b5a6b6a1cd7cacbcd70f8db9dc2c2
Parents: 1b6005a
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 10:53:36 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 10:53:36 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 129 +++++++++++++------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   6 +-
 2 files changed, 92 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e37d32a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d506507..fca3104 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -474,6 +474,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         UUID locNodeId = getLocalNodeId();
 
+        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
         if (F.contains(spi.locNodeAddrs, addr)) {
             if (clientNodeId == null)
                 return F.t(getLocalNodeId(), false);
@@ -486,7 +488,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             boolean clientPingRes;
 
             try {
-                clientPingRes = clientWorker.ping();
+                clientPingRes = clientWorker.ping(timeoutCtrl);
             }
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -509,8 +511,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 Socket sock = null;
 
-                IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
-
                 int reconCnt = 0;
 
                 while (true) {
@@ -899,6 +899,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
 
+        int reconCnt = 0;
+
         while (true){
             // Need to set to false on each new iteration,
             // since remote node may leave in the middle of the first iteration.
@@ -980,6 +982,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
                     break;
 
+                if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+                    break;
+
                 if (!openSock) {
                     // Reconnect for the second time, if connection is not established.
                     if (connectAttempts < 2) {
@@ -2041,10 +2046,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
+                    int reconCnt = 0;
+
+                    IgniteSpiOperationTimeoutController timeoutCtrl = null;
+
                     while (true) {
                         if (sock == null) {
-                            IgniteSpiOperationTimeoutController timeoutCrt =
-                                new IgniteSpiOperationTimeoutController(spi);
+                            if (timeoutCtrl == null)
+                                timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
 
                             nextNodeExists = false;
 
@@ -2056,16 +2065,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                sock = spi.openSocket(addr, timeoutCrt);
+                                sock = spi.openSocket(addr, timeoutCtrl);
 
                                 openSock = true;
 
                                 // Handshake.
                                 writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
-                                    timeoutCrt.nextTimeoutChunk(spi.getNetworkTimeout()));
+                                    timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout()));
 
                                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
-                                    timeoutCrt.nextTimeoutChunk(ackTimeout0));
+                                    timeoutCtrl.nextTimeoutChunk(ackTimeout0));
 
                                 if (locNodeId.equals(res.creatorNodeId())) {
                                     if (log.isDebugEnabled())
@@ -2149,7 +2158,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (!openSock)
                                     break; // Don't retry if we can not establish connection.
 
-                                if (timeoutCrt.checkFailureDetectionThresholdReached(e))
+                                if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+                                    break;
+
+                                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
                                     break;
                                 else if (!spi.failureDetectionThresholdEnabled() && (e instanceof
                                     SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
@@ -2167,9 +2179,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     sock = null;
                                 }
-                                else
+                                else {
                                     // Next node exists and accepts incoming messages.
                                     nextNodeExists = true;
+                                    // Resetting timeout control object to let the code below to use a new one
+                                    // for the next bunch of operations.
+                                    timeoutCtrl = null;
+                                }
                             }
                         }
 
@@ -2182,8 +2198,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
 
-                            IgniteSpiOperationTimeoutController timeoutCtrl;
-
                             if (failure || forceSndPending) {
                                 if (log.isDebugEnabled())
                                     log.debug("Pending messages will be sent [failure=" + failure +
@@ -2196,8 +2210,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 boolean skip = pendingMsgs.discardId != null;
 
                                 for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
-                                    timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
-
                                     if (skip) {
                                         if (pendingMsg.id().equals(pendingMsgs.discardId))
                                             skip = false;
@@ -2210,6 +2222,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
                                         pendingMsgs.discardId);
 
+                                    if (timeoutCtrl == null)
+                                        timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
                                     try {
                                         writeToSocket(sock, pendingMsg, timeoutCtrl.nextTimeoutChunk(
                                             spi.getSocketTimeout()));
@@ -2231,20 +2246,26 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         debugLog("Pending message has been sent to next node [msg=" + msg.id() +
                                             ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
                                             ", res=" + res + ']');
+
+                                    // Resetting timeout control object to create a new one for the next bunch of
+                                    // operations.
+                                    timeoutCtrl = null;
                                 }
                             }
 
                             prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
 
-                            timeoutCtrl
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                writeToSocket(sock, msg);
+                                if (timeoutCtrl == null)
+                                    timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
+                                writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
-                                int res = spi.readReceipt(sock, ackTimeout0);
+                                int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
 
                                 if (log.isDebugEnabled())
                                     log.debug("Message has been sent to next node [msg=" + msg +
@@ -2279,11 +2300,19 @@ class ServerImpl extends TcpDiscoveryImpl {
                             onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
                                 e);
 
-                            if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
-                                ackTimeout0 *= 2;
+                            if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                                break;
 
-                                if (!checkAckTimeout(ackTimeout0))
+                            if (!spi.failureDetectionThresholdEnabled()) {
+                                if (++reconCnt == spi.getReconnectCount())
                                     break;
+                                else if (e instanceof SocketTimeoutException ||
+                                    X.hasCause(e, SocketTimeoutException.class)) {
+                                    ackTimeout0 *= 2;
+
+                                    if (!checkAckTimeout(ackTimeout0))
+                                        break;
+                                }
                             }
                         }
                         finally {
@@ -2296,7 +2325,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 if (log.isDebugEnabled())
                                     log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
-                                        ", i=" + i + ']');
+                                        (!spi.failureDetectionThresholdEnabled() ? ", i=" + reconCnt : "") + ']');
                             }
                         }
                     } // Try to reconnect.
@@ -3360,7 +3389,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
                 else if (leftNode.equals(next) && sock != null) {
                     try {
-                        writeToSocket(sock, msg);
+                        writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ?
+                            spi.failureDetectionThreshold() : spi.getSocketTimeout());
 
                         if (log.isDebugEnabled())
                             log.debug("Sent verified node left message to leaving node: " + msg);
@@ -4119,6 +4149,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 InputStream in;
 
+                IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
                 try {
                     // Set socket options.
                     sock.setKeepAlive(true);
@@ -4126,7 +4158,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     int timeout = sock.getSoTimeout();
 
-                    sock.setSoTimeout((int)spi.netTimeout);
+                    sock.setSoTimeout((int)timeoutCtrl.nextTimeoutChunk(spi.getNetworkTimeout()));
 
                     for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
                         connLsnr.apply(sock);
@@ -4171,7 +4203,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Restore timeout.
                     sock.setSoTimeout(timeout);
 
-                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
+                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, timeoutCtrl.nextTimeoutChunk(
+                        spi.getNetworkTimeout()));
 
                     // Ping.
                     if (msg instanceof TcpDiscoveryPingRequest) {
@@ -4184,10 +4217,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
 
                                 if (clientWorker != null)
-                                    res.clientExists(clientWorker.ping());
+                                    res.clientExists(clientWorker.ping(timeoutCtrl));
                             }
 
-                            spi.writeToSocket(sock, res);
+                            spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
                         }
                         else if (log.isDebugEnabled())
                             log.debug("Ignore ping request, node is stopping.");
@@ -4205,7 +4238,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     TcpDiscoveryHandshakeResponse res =
                         new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
 
-                    spi.writeToSocket(sock, res);
+                    spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                     // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
                     // the local node sends a handshake request message on the loopback address, so we get here.
@@ -4295,10 +4328,15 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 
-                    if (e.hasCause(SocketTimeoutException.class))
+                    if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                        LT.warn(log, null, "Socket operations timed out " +
+                            "(consider increasing 'failureDetectionThreshold' configuration property) " +
+                            "[failureDetectionThreshold=" + spi.failureDetectionThreshold() + ']');
+
+                    else if (!spi.failureDetectionThresholdEnabled() && e.hasCause(SocketTimeoutException.class))
                         LT.warn(log, null, "Socket operation timed out on handshake " +
                             "(consider increasing 'networkTimeout' configuration property) " +
-                            "[netTimeout=" + spi.netTimeout + ']');
+                            "[netTimeout=" + spi.getNetworkTimeout() + ']');
 
                     else if (e.hasCause(ClassNotFoundException.class))
                         LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
@@ -4314,6 +4352,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                long socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                    spi.getSocketTimeout();
+
                 while (!isInterrupted()) {
                     try {
                         TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
@@ -4346,7 +4387,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 TcpDiscoverySpiState state = spiStateCopy();
 
                                 if (state == CONNECTED) {
-                                    spi.writeToSocket(msg, sock, RES_OK);
+                                    spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                                     if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
                                         clientMsgWrk.start();
@@ -4356,7 +4397,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     continue;
                                 }
                                 else {
-                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN);
+                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout);
 
                                     break;
                                 }
@@ -4364,7 +4405,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4393,7 +4434,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4422,7 +4463,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4451,7 +4492,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                             boolean ignored = false;
 
@@ -4493,7 +4534,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         // Send receipt back.
                         if (clientMsgWrk == null)
-                            spi.writeToSocket(msg, sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
                     }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
@@ -4594,8 +4635,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             TcpDiscoverySpiState state = spiStateCopy();
 
+            long socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                spi.getSocketTimeout();
+
             if (state == CONNECTED) {
-                spi.writeToSocket(msg, sock, RES_OK);
+                spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -4629,7 +4673,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Local node is stopping. Remote node should try next one.
                     res = RES_CONTINUE_JOIN;
 
-                spi.writeToSocket(msg, sock, res);
+                spi.writeToSocket(msg, sock, res, socketTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
@@ -4751,7 +4795,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 try {
                     prepareNodeAddedMessage(msg, clientNodeId, null, null);
 
-                    writeToSocket(sock, msg);
+                    writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                        spi.getSocketTimeout());
                 }
                 finally {
                     clearNodeAddedMessage(msg);
@@ -4784,10 +4829,11 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param timeoutCtrl Timeout controller.
          * @return Ping result.
          * @throws InterruptedException If interrupted.
          */
-        public boolean ping() throws InterruptedException {
+        public boolean ping(IgniteSpiOperationTimeoutController timeoutCtrl) throws InterruptedException {
             if (spi.isNodeStopping0())
                 return false;
 
@@ -4813,7 +4859,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             try {
-                return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS);
+                return fut.get(timeoutCtrl.nextTimeoutChunk(spi.getAckTimeout()),
+                    TimeUnit.MILLISECONDS);
             }
             catch (IgniteInterruptedCheckedException ignored) {
                 throw new InterruptedException();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e37d32a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 126bf03..5ed946b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1248,13 +1248,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @param msg Received message.
      * @param sock Socket.
      * @param res Integer response.
+     * @param timeout Socket timeout.
      * @throws IOException If IO failed or write timed out.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+    protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+        throws IOException {
         assert sock != null;
 
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 


[2/4] incubator-ignite git commit: ignite-752: support of failure detection threshold in ClientImpl

Posted by sb...@apache.org.
ignite-752: support of failure detection threshold in ClientImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c3c0ef87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c3c0ef87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c3c0ef87

Branch: refs/heads/ignite-752
Commit: c3c0ef87be79d71aa922d9a11e2d205df19a7829
Parents: 8e37d32
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 11:52:57 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 11:52:57 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 83 ++++++++++++++------
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 14 ++--
 2 files changed, 69 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 3f05f59..62b2e35 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -194,7 +194,8 @@ class ClientImpl extends TcpDiscoveryImpl {
             msgWorker.addMessage(SPI_STOP);
 
             try {
-                if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
+                if (!leaveLatch.await(spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                    spi.getNetworkTimeout(), MILLISECONDS))
                     U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
             }
             catch (InterruptedException ignored) {
@@ -272,7 +273,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (pingFuts.remove(nodeId, finalFut))
                             finalFut.onDone(false);
                     }
-                }, spi.netTimeout);
+                }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : spi.getNetworkTimeout());
 
                 sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
             }
@@ -456,13 +457,17 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         Collection<Throwable> errs = null;
 
-        long ackTimeout0 = spi.ackTimeout;
+        long ackTimeout0 = spi.getAckTimeout();
+
+        int reconCnt = 0;
 
         int connectAttempts = 1;
 
         UUID locNodeId = getLocalNodeId();
 
-        for (int i = 0; i < spi.reconCnt; i++) {
+        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
+        while (true) {
             boolean openSock = false;
 
             Socket sock = null;
@@ -470,7 +475,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr);
+                sock = spi.openSocket(addr, timeoutCtrl);
 
                 openSock = true;
 
@@ -478,7 +483,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 req.client(true);
 
-                spi.writeToSocket(sock, req);
+                spi.writeToSocket(sock, req, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
 
@@ -499,7 +504,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 msg.client(true);
 
-                spi.writeToSocket(sock, msg);
+                spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -522,6 +527,12 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                    break;
+
+                if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+                    break;
+
                 if (!openSock) {
                     // Reconnect for the second time, if connection is not established.
                     if (connectAttempts < 2) {
@@ -533,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     break; // Don't retry if we can not establish connection.
                 }
 
-                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException ||
+                    X.hasCause(e, SocketTimeoutException.class))) {
                     ackTimeout0 *= 2;
 
                     if (!checkAckTimeout(ackTimeout0))
@@ -825,11 +837,17 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
+        /** */
+        private final long socketTimeout;
+
         /**
          *
          */
         protected SocketWriter() {
             super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
+
+            socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                spi.getSocketTimeout();
         }
 
         /**
@@ -893,7 +911,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     msgLsnr.apply(msg);
 
                 try {
-                    spi.writeToSocket(sock, msg);
+                    spi.writeToSocket(sock, msg, socketTimeout);
 
                     msg = null;
                 }
@@ -954,7 +972,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             Exception err = null;
 
-            long timeout = join ? spi.joinTimeout : spi.netTimeout;
+            long timeout = join ? spi.getJoinTimeout() : (spi.failureDetectionThresholdEnabled() ?
+                spi.failureDetectionThreshold() : spi.getNetworkTimeout());
 
             long startTime = U.currentTimeMillis();
 
@@ -966,11 +985,17 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (join) {
                             joinError(new IgniteSpiException("Join process timed out, connection failed and " +
                                 "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
-                                "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[networkTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
                         }
                         else
-                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
-                                "configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+                            U.error(log, "Failed to reconnect to cluster " +
+                                (spi.failureDetectionThresholdEnabled() ?
+                                "(consider increasing 'failureDetectionThreshold' configuration  property) " +
+                                "[failureDetectionThreshold=" + spi.failureDetectionThreshold()
+                                :
+                                "(consider increasing 'networkTimeout' configuration  property) [networkTimeout="
+                                + spi.getNetworkTimeout())
+                                + ", sock=" + sock + ']');
 
                         return;
                     }
@@ -983,7 +1008,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     try {
                         oldTimeout = sock.getSoTimeout();
 
-                        sock.setSoTimeout((int)spi.netTimeout);
+                        sock.setSoTimeout((int)(spi.failureDetectionThresholdEnabled() ?
+                            spi.failureDetectionThreshold() : spi.getNetworkTimeout()));
 
                         InputStream in = new BufferedInputStream(sock.getInputStream());
 
@@ -993,6 +1019,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                         List<TcpDiscoveryAbstractMessage> msgs = null;
 
                         while (!isInterrupted()) {
+                            // How to deal with failure detection threshold? It's bigger then networkTimeout and
+                            // we can't just decrease it on every unmarshal operation.
                             TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
 
                             if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -1028,10 +1056,21 @@ class ClientImpl extends TcpDiscoveryImpl {
                             log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e);
 
                         if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) {
-                            String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " +
-                                "configuration  property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' :
-                                "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
-                                    "configuration  property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']';
+                            String msg;
+
+                            if (join)
+                                msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " +
+                                    "configuration  property) [joinTimeout=" + spi.getJoinTimeout() + ", err=" + e +
+                                    ']';
+                            else
+                                msg = "Failed to reconnect to cluster " +
+                                    (spi.failureDetectionThresholdEnabled() ?
+                                    "(consider increasing 'failureDetectionThreshold' configuration  property) " +
+                                    "[failureDetectionThreshold=" + spi.failureDetectionThreshold()
+                                    :
+                                    "(consider increasing 'networkTimeout' configuration  property) " +
+                                    "[networkTimeout=" + spi.getNetworkTimeout()) + ", sock=" + sock + ']';
+
 
                             U.warn(log, msg);
 
@@ -1094,7 +1133,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             spi.stats.onJoinStarted();
 
             try {
-                final Socket sock = joinTopology(false, spi.joinTimeout);
+                final Socket sock = joinTopology(false, spi.getJoinTimeout());
 
                 if (sock == null) {
                     joinError(new IgniteSpiException("Join process timed out."));
@@ -1106,13 +1145,13 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 sockWriter.setSocket(sock);
 
-                if (spi.joinTimeout > 0) {
+                if (spi.getJoinTimeout() > 0) {
                     timer.schedule(new TimerTask() {
                         @Override public void run() {
                             if (joinLatch.getCount() > 0)
                                 queue.add(JOIN_TIMEOUT);
                         }
-                    }, spi.joinTimeout);
+                    }, spi.getJoinTimeout());
                 }
 
                 sockReader.setSocket(sock, locNode.clientRouterNodeId());
@@ -1124,7 +1163,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (joinLatch.getCount() > 0) {
                             joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
                                 "join request (consider increasing 'joinTimeout' configuration property) " +
-                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
 
                             break;
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ec6a526..c8f3942 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1538,7 +1538,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
             boolean fail = false;
@@ -1556,17 +1556,18 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 sock.close();
             }
 
-            super.writeToSocket(sock, msg, bout);
+            super.writeToSocket(sock, msg, bout, timeout);
 
             if (afterWrite != null)
                 afterWrite.apply(msg, sock);
         }
 
         /** {@inheritDoc} */
-        @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+        @Override protected Socket openSocket(InetSocketAddress sockAddr,
+            IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException {
             waitFor(openSockLock);
 
-            return super.openSocket(sockAddr);
+            return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutController(this));
         }
 
         /**
@@ -1595,7 +1596,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+            throws IOException {
             if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) {
                 TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
 
@@ -1613,7 +1615,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(msg, sock, res);
+            super.writeToSocket(msg, sock, res, timeout);
         }
 
         /** {@inheritDoc} */


[4/4] incubator-ignite git commit: ignite-752: added tests

Posted by sb...@apache.org.
ignite-752: added tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5fc818c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5fc818c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5fc818c0

Branch: refs/heads/ignite-752
Commit: 5fc818c05190f50dd6bfc753f54e74f422426c66
Parents: 0949c93
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 16:22:45 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 16:22:45 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |   8 +-
 .../IgniteSpiOperationTimeoutController.java    |   2 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  18 +-
 ...TcpDiscoverySpiFailureThresholdSelfTest.java | 215 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   1 +
 5 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index b3d2bfc..49b53c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -190,7 +190,7 @@ public class IgniteConfiguration {
     public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
 
     /** Default failure detection threshold used by DiscoverySpi and CommunicationSpi in millis. */
-    public static final int DFLT_FAILURE_DETECTION_THRESHOLD = 10_000;
+    public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 10_000;
 
     /** Optional grid name. */
     private String gridName;
@@ -370,7 +370,7 @@ public class IgniteConfiguration {
     private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE;
 
     /** Failure detection threshold used by DiscoverySpi and CommunicationSpi. */
-    private int failureDetectionThreshold = DFLT_FAILURE_DETECTION_THRESHOLD;
+    private long failureDetectionThreshold = DFLT_FAILURE_DETECTION_THRESHOLD;
 
     /** Property names to include into node attributes. */
     private String[] includeProps;
@@ -1639,7 +1639,7 @@ public class IgniteConfiguration {
      * TODO: IGNITE-752
      * @return
      */
-    public int getFailureDetectionThreshold() {
+    public long getFailureDetectionThreshold() {
         return failureDetectionThreshold;
     }
 
@@ -1647,7 +1647,7 @@ public class IgniteConfiguration {
      * TODO: IGNITE-752
      * @param failureDetectionThreshold
      */
-    public void setFailureDetectionThreshold(int failureDetectionThreshold) {
+    public void setFailureDetectionThreshold(long failureDetectionThreshold) {
         this.failureDetectionThreshold = failureDetectionThreshold;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
index ba95871..84ffd0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
@@ -48,7 +48,7 @@ public class IgniteSpiOperationTimeoutController {
         failureDetectionThreshold = adapter.failureDetectionThreshold();
 
         assert !failureDetectionThresholdEnabled || failureDetectionThreshold > 0 : " [failureDetectionThreshold=" +
-            failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled + ']';
+            failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 5be7ab9..eb131b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1562,27 +1562,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
         initFailureDetectionThreshold();
 
-        if (!failureDetectionThresholdEnabled()) {
-            if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
+        if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
+            if (!failureDetectionThresholdEnabled()) {
                 if (ackTimeout == 0)
                     ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
 
                 if (sockTimeout == 0)
                     sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT;
+            }
 
-                impl = new ClientImpl(this);
+            impl = new ClientImpl(this);
 
-                ctxInitLatch.countDown();
-            } else {
+            ctxInitLatch.countDown();
+        } else {
+            if (!failureDetectionThresholdEnabled()) {
                 if (ackTimeout == 0)
                     ackTimeout = DFLT_ACK_TIMEOUT;
 
                 if (sockTimeout == 0)
                     sockTimeout = DFLT_SOCK_TIMEOUT;
-
-                impl = new ServerImpl(this);
             }
 
+            impl = new ServerImpl(this);
+        }
+
+        if (!failureDetectionThresholdEnabled()) {
             assertParameter(netTimeout > 0, "networkTimeout > 0");
             assertParameter(sockTimeout > 0, "sockTimeout > 0");
             assertParameter(ackTimeout > 0, "ackTimeout > 0");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
new file mode 100644
index 0000000..db0d9c5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ *
+ */
+public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest {
+    /** */
+    private static TestTcpDiscoverySpi firstSpi;
+
+    /** */
+    private static TestTcpDiscoverySpi secondSpi;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder =  new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected DiscoverySpi getSpi(int idx) {
+        TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
+
+        if (idx == 0)
+            firstSpi = spi;
+        else
+            secondSpi = spi;
+
+        spi.setMetricsProvider(createMetricsProvider());
+        spi.setIpFinder(ipFinder);
+
+        return spi;
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionThresholdEnabled() throws Exception {
+        assertTrue(firstSpi.failureDetectionThresholdEnabled());
+        assertTrue(secondSpi.failureDetectionThresholdEnabled());
+
+        assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi.failureDetectionThreshold());
+        assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi.failureDetectionThreshold());
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionOnSocketOpen() throws Exception {
+        try {
+            ClusterNode node = secondSpi.getLocalNode();
+
+            firstSpi.openSocketTimeout = true;
+
+            assertFalse(firstSpi.pingNode(node.id()));
+            assertTrue(firstSpi.validTimeout);
+            assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeout"));
+
+            firstSpi.openSocketTimeout = false;
+            firstSpi.openSocketTimeoutWait = true;
+
+            assertFalse(firstSpi.pingNode(node.id()));
+            assertTrue(firstSpi.validTimeout);
+            assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeoutWait"));
+        }
+        finally {
+            firstSpi.resetState();
+        }
+    }
+
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionOnSocketWrite() throws Exception {
+        try {
+            ClusterNode node = secondSpi.getLocalNode();
+
+            firstSpi.writeToSocketTimeoutWait = true;
+
+            assertFalse(firstSpi.pingNode(node.id()));
+            assertTrue(firstSpi.validTimeout);
+
+            firstSpi.writeToSocketTimeoutWait = false;
+
+            assertTrue(firstSpi.pingNode(node.id()));
+            assertTrue(firstSpi.validTimeout);
+        }
+        finally {
+            firstSpi.resetState();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private boolean openSocketTimeout;
+
+        /** */
+        private boolean openSocketTimeoutWait;
+
+        /** */
+        private boolean writeToSocketTimeoutWait;
+
+        /** */
+        private boolean validTimeout = true;
+
+        /** */
+        private IgniteSpiOperationTimeoutException err;
+
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(InetSocketAddress sockAddr,
+            IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException {
+
+            if (openSocketTimeout) {
+                err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout");
+                throw err;
+            }
+            else if (openSocketTimeoutWait) {
+                long timeout = timeoutCtrl.nextTimeoutChunk(0);
+
+                try {
+                    Thread.sleep(timeout);
+                }
+                catch (InterruptedException e) {
+                    // Ignore
+                }
+
+                try {
+                    timeoutCtrl.nextTimeoutChunk(0);
+                }
+                catch (IgniteSpiOperationTimeoutException e) {
+                    err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait");
+                    throw err;
+                }
+            }
+
+            Socket sock = super.openSocket(sockAddr, timeoutCtrl);
+
+            try {
+                Thread.sleep(1500);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+
+            return sock;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
+            throws IOException, IgniteCheckedException {
+            if (!(msg instanceof TcpDiscoveryPingRequest)) {
+                super.writeToSocket(sock, msg, timeout);
+                return;
+            }
+
+            if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD) {
+                validTimeout = false;
+
+                throw new IgniteCheckedException("Invalid timeout: " + timeout);
+            }
+
+            if (writeToSocketTimeoutWait) {
+                try {
+                    Thread.sleep(timeout);
+                }
+                catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+            else
+                super.writeToSocket(sock, msg, timeout);
+        }
+
+        /**
+         *
+         */
+        private void resetState() {
+            openSocketTimeout = false;
+            openSocketTimeoutWait = false;
+            writeToSocketTimeoutWait = false;
+            err = null;
+            validTimeout = true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fc818c0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 6f59f14..357fd93 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -44,6 +44,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(TcpDiscoverySelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiSelfTest.class));
+        suite.addTest(new TestSuite(TcpDiscoverySpiFailureThresholdSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiStartStopSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySpiConfigSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoveryMarshallerCheckSelfTest.class));


[3/4] incubator-ignite git commit: ignite-752: failure detection threshold support in TcpCommunicationSpi

Posted by sb...@apache.org.
ignite-752: failure detection threshold support in TcpCommunicationSpi


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0949c93f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0949c93f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0949c93f

Branch: refs/heads/ignite-752
Commit: 0949c93fe1d1dd435b906fb1d79263e9073c0f1b
Parents: c3c0ef8
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 13:25:48 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 13:25:48 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  20 ++--
 .../IgniteSpiOperationTimeoutController.java    |   7 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 106 +++++++++++++++----
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 4 files changed, 102 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 422ce81..500c461 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -203,15 +203,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
     }
 
-    /** {@inheritDoc} */
-    @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
-        if (!failureDetectionThresholdEnabled) {
-            failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
-
-            assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
-        }
-    }
-
     /**
      * Inject ignite instance.
      */
@@ -579,6 +570,17 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
     /**
      * TODO: IGNITE-752
+     */
+    protected void initFailureDetectionThreshold() {
+        if (failureDetectionThresholdEnabled) {
+            failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
+
+            assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
+        }
+    }
+
+    /**
+     * TODO: IGNITE-752
      * @param enabled
      */
     public void failureDetectionThresholdEnabled(boolean enabled) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
index 3ae4fa4..ba95871 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
@@ -46,6 +46,9 @@ public class IgniteSpiOperationTimeoutController {
     public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) {
         failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
         failureDetectionThreshold = adapter.failureDetectionThreshold();
+
+        assert !failureDetectionThresholdEnabled || failureDetectionThreshold > 0 : " [failureDetectionThreshold=" +
+            failureDetectionThreshold + ", thresholdEnabled=" + failureDetectionThresholdEnabled + ']';
     }
 
     /**
@@ -70,8 +73,8 @@ public class IgniteSpiOperationTimeoutController {
             lastOperStartTs = curTs;
 
             if (timeout <= 0)
-                throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase failure detection" +
-                    " threshold using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts" +
+                throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+                    "'failureDetectionThreshold' configuration property or set SPI specific timeouts" +
                     " manually. Current failure detection threshold: " + failureDetectionThreshold);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ca2995..b2bc9eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -970,6 +970,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @IgniteSpiConfiguration(optional = true)
     public void setConnectTimeout(long connTimeout) {
         this.connTimeout = connTimeout;
+
+        failureDetectionThresholdEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -992,6 +994,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @IgniteSpiConfiguration(optional = true)
     public void setMaxConnectTimeout(long maxConnTimeout) {
         this.maxConnTimeout = maxConnTimeout;
+
+        failureDetectionThresholdEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1010,6 +1014,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @IgniteSpiConfiguration(optional = true)
     public void setReconnectCount(int reconCnt) {
         this.reconCnt = reconCnt;
+
+        failureDetectionThresholdEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1239,6 +1245,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
         nodeIdMsg = new NodeIdMessage(getLocalNodeId());
 
+        initFailureDetectionThreshold();
+
         assertParameter(locPort > 1023, "locPort > 1023");
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1247,10 +1255,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
         assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
-        assertParameter(reconCnt > 0, "reconnectCnt > 0");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
-        assertParameter(connTimeout >= 0, "connTimeout >= 0");
-        assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+
+        if (!failureDetectionThresholdEnabled()) {
+            assertParameter(reconCnt > 0, "reconnectCnt > 0");
+            assertParameter(connTimeout >= 0, "connTimeout >= 0");
+            assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+        }
+
         assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
         assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
         assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
@@ -1260,7 +1272,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
 
             assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
-                "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
+                            "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
         }
 
         try {
@@ -1326,9 +1338,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
             log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
-            log.debug(configInfo("connTimeout", connTimeout));
-            log.debug(configInfo("maxConnTimeout", maxConnTimeout));
-            log.debug(configInfo("reconCnt", reconCnt));
+
+            if (failureDetectionThresholdEnabled()) {
+                log.debug(configInfo("connTimeout", connTimeout));
+                log.debug(configInfo("maxConnTimeout", maxConnTimeout));
+                log.debug(configInfo("reconCnt", reconCnt));
+            }
+            else
+                log.debug(configInfo("failureDetectionThreshold", failureDetectionThreshold()));
+
             log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
             log.debug(configInfo("ackSndThreshold", ackSndThreshold));
             log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
@@ -1850,17 +1868,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         long connTimeout0 = connTimeout;
 
+        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+
         while (true) {
             GridCommunicationClient client;
 
             try {
                 client = new GridShmemCommunicationClient(metricsLsnr,
                     port,
-                    connTimeout,
+                    timeoutCtrl.nextTimeoutChunk(connTimeout),
                     log,
                     getSpiContext().messageFormatter());
             }
             catch (IgniteCheckedException e) {
+                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                    throw e;
+
                 // Reconnect for the second time, if connection is not established.
                 if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
                     connectAttempts++;
@@ -1872,15 +1895,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), connTimeout0);
+                safeHandshake(client, null, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
             }
-            catch (HandshakeTimeoutException e) {
+            catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+                client.forceClose();
+
+                if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
+                    timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+                    log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" +
+                        failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']');
+
+                    throw e;
+                }
+
+                assert !failureDetectionThresholdEnabled();
+
                 if (log.isDebugEnabled())
-                    log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                    log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                         ", err=" + e.getMessage() + ", client=" + client + ']');
 
-                client.forceClose();
-
                 if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
                     if (log.isDebugEnabled())
                         log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
@@ -1889,8 +1922,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             ", err=" + e.getMessage() + ", client=" + client + ']');
 
                     throw e;
-                }
-                else {
+                } else {
                     attempt++;
 
                     connTimeout0 *= 2;
@@ -1994,6 +2026,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             int attempt = 1;
 
+            IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+
             while (!conn) { // Reconnection on handshake timeout.
                 try {
                     SocketChannel ch = SocketChannel.open();
@@ -2020,9 +2054,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     long rcvCnt = -1;
 
                     try {
-                        ch.socket().connect(addr, (int)connTimeout);
+                        ch.socket().connect(addr, (int)timeoutCtrl.nextTimeoutChunk(connTimeout));
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
 
                         if (rcvCnt == -1)
                             return null;
@@ -2056,14 +2090,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                     }
                 }
-                catch (HandshakeTimeoutException e) {
+                catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                     if (client != null) {
                         client.forceClose();
 
                         client = null;
                     }
 
-                    onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+                    if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
+                        timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+
+                        String msg = "Handshake timed out (failure detection threshold is reached) " +
+                            "[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']';
+
+                        onException(msg, e);
+
+                        if (log.isDebugEnabled())
+                            log.debug(msg);
+
+                        if (errs == null)
+                            errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+                                "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
+                                "in order to prevent parties from waiting forever in case of network issues " +
+                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+
+                        errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+
+                        break;
+                    }
+
+                    assert !failureDetectionThresholdEnabled();
+
+                    onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                         ", addr=" + addr + ']', e);
 
                     if (log.isDebugEnabled())
@@ -2108,7 +2166,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
 
-                    if (X.hasCause(e, SocketTimeoutException.class))
+                    boolean failureDetThrReached = timeoutCtrl.checkFailureDetectionThresholdReached(e);
+
+                    if (failureDetThrReached)
+                        LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " +
+                            "configuration property) [addr=" + addr + ", failureDetectionThreshold=" +
+                            failureDetectionThreshold() + ']');
+                    else if (X.hasCause(e, SocketTimeoutException.class))
                         LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
                             "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
 
@@ -2121,7 +2185,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
 
                     // Reconnect for the second time, if connection is not established.
-                    if (connectAttempts < 2 &&
+                    if (!failureDetThrReached && connectAttempts < 2 &&
                         (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
                         connectAttempts++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0949c93f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 5ed946b..5be7ab9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1560,7 +1560,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
-        super.spiStart(gridName);
+        initFailureDetectionThreshold();
 
         if (!failureDetectionThresholdEnabled()) {
             if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {