You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/03/04 14:36:55 UTC
[ignite] branch master updated: IGNITE-14231
IGNITE_ENABLE_FORCIBLE_NODE_KILL flag support in inverse connection
protocol - Fixes #8826.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e99bdba IGNITE-14231 IGNITE_ENABLE_FORCIBLE_NODE_KILL flag support in inverse connection protocol - Fixes #8826.
e99bdba is described below
commit e99bdbab4f906b5b4b40ee0aa60e2da18e6443af
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Thu Mar 4 17:30:02 2021 +0300
IGNITE-14231 IGNITE_ENABLE_FORCIBLE_NODE_KILL flag support in inverse connection protocol - Fixes #8826.
Signed-off-by: Ivan Bessonov <be...@gmail.com>
---
.../tcp/internal/CommunicationTcpUtils.java | 40 ++++++++++++-
.../tcp/internal/ConnectionClientPool.java | 20 +++++--
.../tcp/internal/GridNioServerWrapper.java | 39 +++++--------
...unicationInverseConnectionEstablishingTest.java | 67 +++++++++++++++++++---
4 files changed, 128 insertions(+), 38 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java
index d0855d0..68dd6bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java
@@ -28,11 +28,14 @@ import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.communication.tcp.AttributeNames;
@@ -43,6 +46,10 @@ public class CommunicationTcpUtils {
/** No-op runnable. */
public static final IgniteRunnable NOOP = () -> {};
+ /** */
+ private static final boolean THROUBLESHOOTING_LOG_ENABLED = IgniteSystemProperties
+ .getBoolean(IgniteSystemProperties.IGNITE_TROUBLESHOOTING_LOGGER);
+
/**
* @param node Node.
* @return {@code True} if can use in/out connection pair for communication.
@@ -154,7 +161,7 @@ public class CommunicationTcpUtils {
* @param errs Error.
* @return {@code True} if error was caused by some connection IO error or IgniteCheckedException due to timeout.
*/
- public static boolean isRecoverableException(Exception errs) {
+ public static boolean isRecoverableException(Throwable errs) {
return X.hasCause(
errs,
IOException.class,
@@ -162,4 +169,35 @@ public class CommunicationTcpUtils {
IgniteSpiOperationTimeoutException.class
);
}
+
+ /**
+ * Forcibly fails client node.
+ *
+ * Is used in a single situation if a client node is visible to discovery but is not reachable via comm protocol.
+ *
+ * @param nodeToFail Client node to forcible fail.
+ * @param spiCtx Context to request node failing.
+ * @param err Error to fail client with.
+ * @param log Logger to print message about failed node to.
+ */
+ public static void failNode(ClusterNode nodeToFail,
+ IgniteSpiContext spiCtx,
+ Throwable err,
+ IgniteLogger log
+ ) {
+ assert nodeToFail.isClient();
+
+ String logMsg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
+ "cluster [rmtNode=" + nodeToFail + ']';
+
+ if (THROUBLESHOOTING_LOG_ENABLED)
+ U.error(log, logMsg, err);
+ else
+ U.warn(log, logMsg);
+
+ spiCtx.failNode(nodeToFail.id(), "TcpCommunicationSpi failed to establish connection to node [" +
+ "rmtNode=" + nodeToFail +
+ ", err=" + err +
+ ", connectErrs=" + X.getSuppressedList(err) + ']');
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index 8445dcd..329ebc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -121,6 +122,10 @@ public class ConnectionClientPool {
/** Scheduled executor service which closed the socket if handshake timeout is out. **/
private final ScheduledExecutorService handshakeTimeoutExecutorService;
+ /** Enable forcible node kill. */
+ private boolean forcibleNodeKillEnabled = IgniteSystemProperties
+ .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
/**
* @param cfg Config.
* @param attrs Attributes.
@@ -381,10 +386,17 @@ public class ConnectionClientPool {
? cfg.failureDetectionTimeout()
: cfg.connectionTimeout();
- fut.get(failTimeout);
- }
- catch (IgniteCheckedException triggerException) {
- IgniteSpiException spiE = new IgniteSpiException(triggerException);
+ fut.get(failTimeout);
+ }
+ catch (Throwable triggerException) {
+ if (forcibleNodeKillEnabled
+ && node.isClient()
+ && triggerException instanceof IgniteFutureTimeoutCheckedException
+ ) {
+ CommunicationTcpUtils.failNode(node, tcpCommSpi.getSpiContext(), triggerException, log);
+ }
+
+ IgniteSpiException spiE = new IgniteSpiException(e);
spiE.addSuppressed(e);
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index 6bc1502..1509d74 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -203,13 +203,9 @@ public class GridNioServerWrapper {
private volatile ThrowableSupplier<SocketChannel, IOException> socketChannelFactory = SocketChannel::open;
/** Enable forcible node kill. */
- private boolean enableForcibleNodeKill = IgniteSystemProperties
+ private boolean forcibleNodeKillEnabled = IgniteSystemProperties
.getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
- /** Enable troubleshooting logger. */
- private boolean enableTroubleshootingLog = IgniteSystemProperties
- .getBoolean(IgniteSystemProperties.IGNITE_TROUBLESHOOTING_LOGGER);
-
/** NIO server. */
private GridNioServer<Message> nioSrv;
@@ -647,12 +643,16 @@ public class GridNioServerWrapper {
}
if (ses == null) {
- if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) {
- if (node.isClient() && (addrs.size() - skippedAddrs == failedAddrsSet.size())) {
- String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet +
- "; inverse connection will be requested.";
-
- throw new NodeUnreachableException(msg);
+ // If local node and remote node are configured to use paired connections we won't even request
+ // inverse connection so no point in throwing NodeUnreachableException
+ if (!cfg.usePairedConnections() || !Boolean.TRUE.equals(node.attribute(attrs.pairedConnection()))) {
+ if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) {
+ if (node.isClient() && (addrs.size() - skippedAddrs == failedAddrsSet.size())) {
+ String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet +
+ "; inverse connection will be requested.";
+
+ throw new NodeUnreachableException(msg);
+ }
}
}
@@ -768,25 +768,14 @@ public class GridNioServerWrapper {
ctx.resolveCommunicationFailure(node, errs);
}
- if (!commErrResolve && enableForcibleNodeKill) {
+ if (!commErrResolve && forcibleNodeKillEnabled) {
if (ctx.node(node.id()) != null
&& node.isClient()
&& !locNodeSupplier.get().isClient()
&& isRecoverableException(errs)
) {
- // Only server can fail client for now, as in TcpDiscovery resolveCommunicationFailure() is not supported.
- String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
- "cluster [" + "rmtNode=" + node + ']';
-
- if (enableTroubleshootingLog)
- U.error(log, msg, errs);
- else
- U.warn(log, msg);
-
- ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
- "rmtNode=" + node +
- ", errs=" + errs +
- ", connectErrs=" + X.getSuppressedList(errs) + ']');
+ CommunicationTcpUtils.failNode(node,
+ ctx, errs, log);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java
index cbf4777..7221d52 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java
@@ -29,11 +29,14 @@ import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -41,6 +44,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
@@ -50,6 +54,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assume;
import org.junit.Test;
@@ -299,13 +304,29 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC
IgniteEx srv = grid(SRVS_NUM - 1);
- // We need to interrupt communication worker client nodes so that
- // closed connection won't automatically reopen when we don't expect it.
- // Server communication worker is interrupted for another reason - it can hang the test
- // due to bug in inverse connection protocol & comm worker - it will be fixed later.
+ interruptCommWorkerThreads(client.name());
+
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+ GridTestUtils.invoke(spi, "onNodeLeft", clientNode.consistentId(), clientNode.id());
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() ->
+ srv.context().io().sendIoTest(clientNode, new byte[10], false).get()
+ );
+
+ assertTrue(GridTestUtils.waitForCondition(fut::isDone, 30_000));
+
+ assertTrue(lsnr.check());
+ }
+
+ /**
+ * We need to interrupt communication worker client nodes so that
+ * closed connection won't automatically reopen when we don't expect it.
+ */
+ private void interruptCommWorkerThreads(String clientName) {
List<Thread> tcpCommWorkerThreads = Thread.getAllStackTraces().keySet().stream()
.filter(t -> t.getName().contains("tcp-comm-worker"))
- .filter(t -> t.getName().contains(srv.name()) || t.getName().contains(client.name()))
+ .filter(t -> t.getName().contains(clientName))
.collect(Collectors.toList());
for (Thread tcpCommWorkerThread : tcpCommWorkerThreads) {
@@ -313,6 +334,38 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC
U.join(tcpCommWorkerThread, log);
}
+ }
+
+ /**
+ * Forcible node kill functionality is triggered in inverse connection request flow as well
+ * when a timeout for inverse connection is reached.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, value = "true")
+ public void testClientSkippingInverseConnResponseIsForciblyFailed() throws Exception {
+ UNREACHABLE_DESTINATION.set(UNRESOLVED_HOST);
+ RESPOND_TO_INVERSE_REQUEST.set(false);
+
+ AtomicBoolean clientFailedEventFlag = new AtomicBoolean(false);
+
+ IgniteEx srv = startGrid();
+
+ srv.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ clientFailedEventFlag.set(true);
+
+ return false;
+ }
+ }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+
+ forceClientToSrvConnections = false;
+
+ IgniteEx client = startClientGrid(1);
+ ClusterNode clientNode = client.localNode();
+
+ interruptCommWorkerThreads(client.name());
TcpCommunicationSpi spi = (TcpCommunicationSpi)srv.configuration().getCommunicationSpi();
@@ -322,9 +375,7 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC
srv.context().io().sendIoTest(clientNode, new byte[10], false).get()
);
- assertTrue(GridTestUtils.waitForCondition(fut::isDone, 30_000));
-
- assertTrue(lsnr.check());
+ assertTrue(GridTestUtils.waitForCondition(clientFailedEventFlag::get, 10_000));
}
/**