You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/07/21 13:52:51 UTC

[ignite] branch ignite-2.9 updated: IGNITE-13016 : Timeouts and performance of backward checking of failed nodes is fixed. - Fixes #7838.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch ignite-2.9
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.9 by this push:
     new 906827b  IGNITE-13016 : Timeouts and performance of backward checking of failed nodes is fixed. - Fixes #7838.
906827b is described below

commit 906827b506e01d97bff597562ef53cd15fd5b4fc
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Tue Jul 21 13:24:25 2020 +0300

    IGNITE-13016 : Timeouts and performance of backward checking of failed nodes is fixed. - Fixes #7838.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
    (cherry picked from commit 03ee85695014ff6aaa87e256d330d32342d34224)
---
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 97 +++++++++++++++-------
 .../GridFailFastNodeFailureDetectionSelfTest.java  |  2 +-
 2 files changed, 68 insertions(+), 31 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3d224e3..287347d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -54,9 +54,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLServerSocket;
@@ -383,17 +385,14 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         lastRingMsgSentTime = 0;
 
-        long msgExchangeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
-            spi.getSocketTimeout() + spi.getAckTimeout();
-
         // Since we take in account time of last sent message, the interval should be quite short to give enough piece
         // of failure detection timeout as send-and-acknowledge timeout of the message to send.
-        connCheckInterval = Math.min(msgExchangeTimeout / 4, MAX_CON_CHECK_INTERVAL);
+        connCheckInterval = Math.min(effectiveExchangeTimeout() / 4, MAX_CON_CHECK_INTERVAL);
 
         utilityPool = new IgniteThreadPoolExecutor("disco-pool",
             spi.ignite().name(),
             0,
-            1,
+            4,
             2000,
             new LinkedBlockingQueue<>());
 
@@ -2008,6 +2007,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         return threads;
     }
 
+    /** @return Complete timeout of single message exchange operation on established connection. */
+    protected long effectiveExchangeTimeout() {
+        return spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+            spi.getSocketTimeout() + spi.getAckTimeout();
+    }
+
     /** {@inheritDoc} */
     @Override public void updateMetrics(UUID nodeId,
         ClusterMetrics metrics,
@@ -6872,9 +6877,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                         // Need to check connectivity to it.
                         long rcvdTime = lastRingMsgReceivedTime;
                         long now = System.nanoTime();
+                        long timeThreshold = rcvdTime + U.millisToNanos(effectiveExchangeTimeout());
 
-                        // We got message from previous in less than double connection check interval.
-                        boolean ok = rcvdTime + U.millisToNanos(connCheckInterval) * 2 >= now;
+                        // We got message from previous in less than effective exchange timeout.
+                        boolean ok = timeThreshold > now;
                         TcpDiscoveryNode previous = null;
 
                         if (ok) {
@@ -6892,18 +6898,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             if (previous != null && !previous.id().equals(nodeId) &&
                                 (req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) {
-                                Collection<InetSocketAddress> nodeAddrs =
-                                    spi.getNodeAddresses(previous, false);
+                                Collection<InetSocketAddress> nodeAddrs = spi.getNodeAddresses(previous, false);
 
-                                for (InetSocketAddress addr : nodeAddrs) {
-                                    // Connection refused may be got if node doesn't listen
-                                    // (or blocked by firewall, but anyway assume it is dead).
-                                    if (!isConnectionRefused(addr)) {
-                                        liveAddr = addr;
-
-                                        break;
-                                    }
-                                }
+                                liveAddr = checkConnection(new ArrayList<>(nodeAddrs),
+                                    (int)U.nanosToMillis(timeThreshold - now));
 
                                 if (log.isInfoEnabled())
                                     log.info("Connection check done [liveAddr=" + liveAddr
@@ -7366,22 +7364,61 @@ class ServerImpl extends TcpDiscoveryImpl {
             lastRingMsgReceivedTime = System.nanoTime();
         }
 
-        /**
-         * @param addr Address to check.
-         * @return {@code True} if got connection refused on connect try.
-         */
-        private boolean isConnectionRefused(SocketAddress addr) {
-            try (Socket sock = new Socket()) {
-                sock.connect(addr, 100);
+        /** @return Alive address if was able to connected to. {@code Null} otherwise. */
+        private InetSocketAddress checkConnection(List<InetSocketAddress> addrs, int timeout) {
+            AtomicReference<InetSocketAddress> liveAddrHolder = new AtomicReference<>();
+
+            CountDownLatch latch = new CountDownLatch(addrs.size());
+
+            int addrLeft = addrs.size();
+
+            int threadsLeft = utilityPool.getMaximumPoolSize();
+
+            AtomicInteger addrIdx = new AtomicInteger();
+
+            while (addrLeft > 0) {
+                int addrPerThread = addrLeft / threadsLeft + (addrLeft % threadsLeft > 0 ? 1 : 0);
+
+                addrLeft -= addrPerThread;
+
+                --threadsLeft;
+
+                utilityPool.execute(new Thread() {
+                    private final int addrsToCheck = addrPerThread;
+
+                    /** */
+                    @Override public void run() {
+                        int perAddrTimeout = timeout / addrsToCheck;
+
+                        for (int i = 0; i < addrsToCheck; ++i) {
+                            InetSocketAddress addr = addrs.get(addrIdx.getAndIncrement());
+
+                            try (Socket sock = new Socket()) {
+                                if (liveAddrHolder.get() == null) {
+                                    sock.connect(addr, perAddrTimeout);
+
+                                    liveAddrHolder.compareAndSet(null, addr);
+                                }
+                            }
+                            catch (Exception ignored) {
+                                // No-op.
+                            }
+                            finally {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                });
             }
-            catch (ConnectException e) {
-                return true;
+
+            try {
+                latch.await(timeout, TimeUnit.MILLISECONDS);
             }
-            catch (IOException e) {
-                return false;
+            catch (InterruptedException ignored) {
+                // No-op.
             }
 
-            return false;
+            return liveAddrHolder.get();
         }
 
         /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
index 73cb5f7..d727550 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
@@ -107,7 +107,7 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
 
         failNode(ignite1);
 
-        assert failLatch.await(1500, MILLISECONDS);
+        assert failLatch.await(ignite1.configuration().getFailureDetectionTimeout(), MILLISECONDS);
     }
 
     /**