You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/07/21 10:33:36 UTC
[ignite] branch master updated: IGNITE-13016 : Timeouts and
performance of backward checking of failed nodes is fixed. - Fixes #7838.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 03ee856 IGNITE-13016 : Timeouts and performance of backward checking of failed nodes is fixed. - Fixes #7838.
03ee856 is described below
commit 03ee85695014ff6aaa87e256d330d32342d34224
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Tue Jul 21 13:24:25 2020 +0300
IGNITE-13016 : Timeouts and performance of backward checking of failed nodes is fixed. - Fixes #7838.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../ignite/spi/discovery/tcp/ServerImpl.java | 97 +++++++++++++++-------
.../GridFailFastNodeFailureDetectionSelfTest.java | 2 +-
2 files changed, 68 insertions(+), 31 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a43d8b1..06f8b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -54,9 +54,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLServerSocket;
@@ -384,17 +386,14 @@ class ServerImpl extends TcpDiscoveryImpl {
lastRingMsgSentTime = 0;
- long msgExchangeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
- spi.getSocketTimeout() + spi.getAckTimeout();
-
// Since we take in account time of last sent message, the interval should be quite short to give enough piece
// of failure detection timeout as send-and-acknowledge timeout of the message to send.
- connCheckInterval = Math.min(msgExchangeTimeout / 4, MAX_CON_CHECK_INTERVAL);
+ connCheckInterval = Math.min(effectiveExchangeTimeout() / 4, MAX_CON_CHECK_INTERVAL);
utilityPool = new IgniteThreadPoolExecutor("disco-pool",
spi.ignite().name(),
0,
- 1,
+ 4,
2000,
new LinkedBlockingQueue<>());
@@ -2009,6 +2008,12 @@ class ServerImpl extends TcpDiscoveryImpl {
return threads;
}
+ /** @return Complete timeout of single message exchange operation on established connection. */
+ protected long effectiveExchangeTimeout() {
+ return spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ spi.getSocketTimeout() + spi.getAckTimeout();
+ }
+
/** {@inheritDoc} */
@Override public void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
@@ -6873,9 +6878,10 @@ class ServerImpl extends TcpDiscoveryImpl {
// Need to check connectivity to it.
long rcvdTime = lastRingMsgReceivedTime;
long now = System.nanoTime();
+ long timeThreshold = rcvdTime + U.millisToNanos(effectiveExchangeTimeout());
- // We got message from previous in less than double connection check interval.
- boolean ok = rcvdTime + U.millisToNanos(connCheckInterval) * 2 >= now;
+ // We got message from previous in less than effective exchange timeout.
+ boolean ok = timeThreshold > now;
TcpDiscoveryNode previous = null;
if (ok) {
@@ -6893,18 +6899,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (previous != null && !previous.id().equals(nodeId) &&
(req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) {
- Collection<InetSocketAddress> nodeAddrs =
- spi.getNodeAddresses(previous, false);
+ Collection<InetSocketAddress> nodeAddrs = spi.getNodeAddresses(previous, false);
- for (InetSocketAddress addr : nodeAddrs) {
- // Connection refused may be got if node doesn't listen
- // (or blocked by firewall, but anyway assume it is dead).
- if (!isConnectionRefused(addr)) {
- liveAddr = addr;
-
- break;
- }
- }
+ liveAddr = checkConnection(new ArrayList<>(nodeAddrs),
+ (int)U.nanosToMillis(timeThreshold - now));
if (log.isInfoEnabled())
log.info("Connection check done [liveAddr=" + liveAddr
@@ -7367,22 +7365,61 @@ class ServerImpl extends TcpDiscoveryImpl {
lastRingMsgReceivedTime = System.nanoTime();
}
- /**
- * @param addr Address to check.
- * @return {@code True} if got connection refused on connect try.
- */
- private boolean isConnectionRefused(SocketAddress addr) {
- try (Socket sock = new Socket()) {
- sock.connect(addr, 100);
+ /** @return Alive address if was able to connected to. {@code Null} otherwise. */
+ private InetSocketAddress checkConnection(List<InetSocketAddress> addrs, int timeout) {
+ AtomicReference<InetSocketAddress> liveAddrHolder = new AtomicReference<>();
+
+ CountDownLatch latch = new CountDownLatch(addrs.size());
+
+ int addrLeft = addrs.size();
+
+ int threadsLeft = utilityPool.getMaximumPoolSize();
+
+ AtomicInteger addrIdx = new AtomicInteger();
+
+ while (addrLeft > 0) {
+ int addrPerThread = addrLeft / threadsLeft + (addrLeft % threadsLeft > 0 ? 1 : 0);
+
+ addrLeft -= addrPerThread;
+
+ --threadsLeft;
+
+ utilityPool.execute(new Thread() {
+ private final int addrsToCheck = addrPerThread;
+
+ /** */
+ @Override public void run() {
+ int perAddrTimeout = timeout / addrsToCheck;
+
+ for (int i = 0; i < addrsToCheck; ++i) {
+ InetSocketAddress addr = addrs.get(addrIdx.getAndIncrement());
+
+ try (Socket sock = new Socket()) {
+ if (liveAddrHolder.get() == null) {
+ sock.connect(addr, perAddrTimeout);
+
+ liveAddrHolder.compareAndSet(null, addr);
+ }
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ finally {
+ latch.countDown();
+ }
+ }
+ }
+ });
}
- catch (ConnectException e) {
- return true;
+
+ try {
+ latch.await(timeout, TimeUnit.MILLISECONDS);
}
- catch (IOException e) {
- return false;
+ catch (InterruptedException ignored) {
+ // No-op.
}
- return false;
+ return liveAddrHolder.get();
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
index 73cb5f7..d727550 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
@@ -107,7 +107,7 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
failNode(ignite1);
- assert failLatch.await(1500, MILLISECONDS);
+ assert failLatch.await(ignite1.configuration().getFailureDetectionTimeout(), MILLISECONDS);
}
/**