You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 15:22:58 UTC

[2/4] incubator-ignite git commit: ignite-752: support of failure detection threshold in ClientImpl

ignite-752: support of failure detection threshold in ClientImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c3c0ef87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c3c0ef87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c3c0ef87

Branch: refs/heads/ignite-752
Commit: c3c0ef87be79d71aa922d9a11e2d205df19a7829
Parents: 8e37d32
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 16 11:52:57 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 16 11:52:57 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 83 ++++++++++++++------
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 14 ++--
 2 files changed, 69 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 3f05f59..62b2e35 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -194,7 +194,8 @@ class ClientImpl extends TcpDiscoveryImpl {
             msgWorker.addMessage(SPI_STOP);
 
             try {
-                if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
+                if (!leaveLatch.await(spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                    spi.getNetworkTimeout(), MILLISECONDS))
                     U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
             }
             catch (InterruptedException ignored) {
@@ -272,7 +273,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (pingFuts.remove(nodeId, finalFut))
                             finalFut.onDone(false);
                     }
-                }, spi.netTimeout);
+                }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : spi.getNetworkTimeout());
 
                 sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
             }
@@ -456,13 +457,17 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         Collection<Throwable> errs = null;
 
-        long ackTimeout0 = spi.ackTimeout;
+        long ackTimeout0 = spi.getAckTimeout();
+
+        int reconCnt = 0;
 
         int connectAttempts = 1;
 
         UUID locNodeId = getLocalNodeId();
 
-        for (int i = 0; i < spi.reconCnt; i++) {
+        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
+        while (true) {
             boolean openSock = false;
 
             Socket sock = null;
@@ -470,7 +475,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr);
+                sock = spi.openSocket(addr, timeoutCtrl);
 
                 openSock = true;
 
@@ -478,7 +483,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 req.client(true);
 
-                spi.writeToSocket(sock, req);
+                spi.writeToSocket(sock, req, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
 
@@ -499,7 +504,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 msg.client(true);
 
-                spi.writeToSocket(sock, msg);
+                spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -522,6 +527,12 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                    break;
+
+                if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+                    break;
+
                 if (!openSock) {
                     // Reconnect for the second time, if connection is not established.
                     if (connectAttempts < 2) {
@@ -533,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     break; // Don't retry if we can not establish connection.
                 }
 
-                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException ||
+                    X.hasCause(e, SocketTimeoutException.class))) {
                     ackTimeout0 *= 2;
 
                     if (!checkAckTimeout(ackTimeout0))
@@ -825,11 +837,17 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
+        /** */
+        private final long socketTimeout;
+
         /**
          *
          */
         protected SocketWriter() {
             super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
+
+            socketTimeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+                spi.getSocketTimeout();
         }
 
         /**
@@ -893,7 +911,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     msgLsnr.apply(msg);
 
                 try {
-                    spi.writeToSocket(sock, msg);
+                    spi.writeToSocket(sock, msg, socketTimeout);
 
                     msg = null;
                 }
@@ -954,7 +972,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             Exception err = null;
 
-            long timeout = join ? spi.joinTimeout : spi.netTimeout;
+            long timeout = join ? spi.getJoinTimeout() : (spi.failureDetectionThresholdEnabled() ?
+                spi.failureDetectionThreshold() : spi.getNetworkTimeout());
 
             long startTime = U.currentTimeMillis();
 
@@ -966,11 +985,17 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (join) {
                             joinError(new IgniteSpiException("Join process timed out, connection failed and " +
                                 "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
-                                "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[networkTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
                         }
                         else
-                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
-                                "configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+                            U.error(log, "Failed to reconnect to cluster " +
+                                (spi.failureDetectionThresholdEnabled() ?
+                                "(consider increasing 'failureDetectionThreshold' configuration  property) " +
+                                "[failureDetectionThreshold=" + spi.failureDetectionThreshold()
+                                :
+                                "(consider increasing 'networkTimeout' configuration  property) [networkTimeout="
+                                + spi.getNetworkTimeout())
+                                + ", sock=" + sock + ']');
 
                         return;
                     }
@@ -983,7 +1008,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     try {
                         oldTimeout = sock.getSoTimeout();
 
-                        sock.setSoTimeout((int)spi.netTimeout);
+                        sock.setSoTimeout((int)(spi.failureDetectionThresholdEnabled() ?
+                            spi.failureDetectionThreshold() : spi.getNetworkTimeout()));
 
                         InputStream in = new BufferedInputStream(sock.getInputStream());
 
@@ -993,6 +1019,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                         List<TcpDiscoveryAbstractMessage> msgs = null;
 
                         while (!isInterrupted()) {
+                            // How to deal with failure detection threshold? It's bigger then networkTimeout and
+                            // we can't just decrease it on every unmarshal operation.
                             TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
 
                             if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -1028,10 +1056,21 @@ class ClientImpl extends TcpDiscoveryImpl {
                             log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e);
 
                         if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) {
-                            String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " +
-                                "configuration  property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' :
-                                "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
-                                    "configuration  property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']';
+                            String msg;
+
+                            if (join)
+                                msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " +
+                                    "configuration  property) [joinTimeout=" + spi.getJoinTimeout() + ", err=" + e +
+                                    ']';
+                            else
+                                msg = "Failed to reconnect to cluster " +
+                                    (spi.failureDetectionThresholdEnabled() ?
+                                    "(consider increasing 'failureDetectionThreshold' configuration  property) " +
+                                    "[failureDetectionThreshold=" + spi.failureDetectionThreshold()
+                                    :
+                                    "(consider increasing 'networkTimeout' configuration  property) " +
+                                    "[networkTimeout=" + spi.getNetworkTimeout()) + ", sock=" + sock + ']';
+
 
                             U.warn(log, msg);
 
@@ -1094,7 +1133,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             spi.stats.onJoinStarted();
 
             try {
-                final Socket sock = joinTopology(false, spi.joinTimeout);
+                final Socket sock = joinTopology(false, spi.getJoinTimeout());
 
                 if (sock == null) {
                     joinError(new IgniteSpiException("Join process timed out."));
@@ -1106,13 +1145,13 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 sockWriter.setSocket(sock);
 
-                if (spi.joinTimeout > 0) {
+                if (spi.getJoinTimeout() > 0) {
                     timer.schedule(new TimerTask() {
                         @Override public void run() {
                             if (joinLatch.getCount() > 0)
                                 queue.add(JOIN_TIMEOUT);
                         }
-                    }, spi.joinTimeout);
+                    }, spi.getJoinTimeout());
                 }
 
                 sockReader.setSocket(sock, locNode.clientRouterNodeId());
@@ -1124,7 +1163,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (joinLatch.getCount() > 0) {
                             joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
                                 "join request (consider increasing 'joinTimeout' configuration property) " +
-                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.getJoinTimeout() + ", sock=" + sock + ']'));
 
                             break;
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3c0ef87/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ec6a526..c8f3942 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1538,7 +1538,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
             boolean fail = false;
@@ -1556,17 +1556,18 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 sock.close();
             }
 
-            super.writeToSocket(sock, msg, bout);
+            super.writeToSocket(sock, msg, bout, timeout);
 
             if (afterWrite != null)
                 afterWrite.apply(msg, sock);
         }
 
         /** {@inheritDoc} */
-        @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+        @Override protected Socket openSocket(InetSocketAddress sockAddr,
+            IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException {
             waitFor(openSockLock);
 
-            return super.openSocket(sockAddr);
+            return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutController(this));
         }
 
         /**
@@ -1595,7 +1596,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+            throws IOException {
             if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) {
                 TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
 
@@ -1613,7 +1615,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(msg, sock, res);
+            super.writeToSocket(msg, sock, res, timeout);
         }
 
         /** {@inheritDoc} */