You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/03/04 14:36:55 UTC

[ignite] branch master updated: IGNITE-14231 IGNITE_ENABLE_FORCIBLE_NODE_KILL flag support in inverse connection protocol - Fixes #8826.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e99bdba  IGNITE-14231 IGNITE_ENABLE_FORCIBLE_NODE_KILL flag support in inverse connection protocol - Fixes #8826.
e99bdba is described below

commit e99bdbab4f906b5b4b40ee0aa60e2da18e6443af
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Thu Mar 4 17:30:02 2021 +0300

    IGNITE-14231 IGNITE_ENABLE_FORCIBLE_NODE_KILL flag support in inverse connection protocol - Fixes #8826.
    
    Signed-off-by: Ivan Bessonov <be...@gmail.com>
---
 .../tcp/internal/CommunicationTcpUtils.java        | 40 ++++++++++++-
 .../tcp/internal/ConnectionClientPool.java         | 20 +++++--
 .../tcp/internal/GridNioServerWrapper.java         | 39 +++++--------
 ...unicationInverseConnectionEstablishingTest.java | 67 +++++++++++++++++++---
 4 files changed, 128 insertions(+), 38 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java
index d0855d0..68dd6bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationTcpUtils.java
@@ -28,11 +28,14 @@ import java.util.List;
 import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.IgniteSpiContext;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.communication.tcp.AttributeNames;
 
@@ -43,6 +46,10 @@ public class CommunicationTcpUtils {
     /** No-op runnable. */
     public static final IgniteRunnable NOOP = () -> {};
 
+    /** */
+    private static final boolean THROUBLESHOOTING_LOG_ENABLED = IgniteSystemProperties
+        .getBoolean(IgniteSystemProperties.IGNITE_TROUBLESHOOTING_LOGGER);
+
     /**
      * @param node Node.
      * @return {@code True} if can use in/out connection pair for communication.
@@ -154,7 +161,7 @@ public class CommunicationTcpUtils {
      * @param errs Error.
      * @return {@code True} if error was caused by some connection IO error or IgniteCheckedException due to timeout.
      */
-    public static boolean isRecoverableException(Exception errs) {
+    public static boolean isRecoverableException(Throwable errs) {
         return X.hasCause(
             errs,
             IOException.class,
@@ -162,4 +169,35 @@ public class CommunicationTcpUtils {
             IgniteSpiOperationTimeoutException.class
         );
     }
+
+    /**
+     * Forcibly fails client node.
+     *
+     * Is used in a single situation if a client node is visible to discovery but is not reachable via comm protocol.
+     *
+     * @param nodeToFail Client node to forcible fail.
+     * @param spiCtx Context to request node failing.
+     * @param err Error to fail client with.
+     * @param log Logger to print message about failed node to.
+     */
+    public static void failNode(ClusterNode nodeToFail,
+        IgniteSpiContext spiCtx,
+        Throwable err,
+        IgniteLogger log
+    ) {
+        assert nodeToFail.isClient();
+
+        String logMsg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
+            "cluster [rmtNode=" + nodeToFail + ']';
+
+        if (THROUBLESHOOTING_LOG_ENABLED)
+            U.error(log, logMsg, err);
+        else
+            U.warn(log, logMsg);
+
+        spiCtx.failNode(nodeToFail.id(), "TcpCommunicationSpi failed to establish connection to node [" +
+            "rmtNode=" + nodeToFail +
+            ", err=" + err +
+            ", connectErrs=" + X.getSuppressedList(err) + ']');
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index 8445dcd..329ebc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -121,6 +122,10 @@ public class ConnectionClientPool {
     /** Scheduled executor service which closed the socket if handshake timeout is out. **/
     private final ScheduledExecutorService handshakeTimeoutExecutorService;
 
+    /** Enable forcible node kill. */
+    private boolean forcibleNodeKillEnabled = IgniteSystemProperties
+        .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
     /**
      * @param cfg Config.
      * @param attrs Attributes.
@@ -381,10 +386,17 @@ public class ConnectionClientPool {
                     ? cfg.failureDetectionTimeout()
                     : cfg.connectionTimeout();
 
-                fut.get(failTimeout);
-            }
-            catch (IgniteCheckedException triggerException) {
-                IgniteSpiException spiE = new IgniteSpiException(triggerException);
+                    fut.get(failTimeout);
+                }
+                catch (Throwable triggerException) {
+                    if (forcibleNodeKillEnabled
+                        && node.isClient()
+                        && triggerException instanceof IgniteFutureTimeoutCheckedException
+                    ) {
+                        CommunicationTcpUtils.failNode(node, tcpCommSpi.getSpiContext(), triggerException, log);
+                    }
+
+                    IgniteSpiException spiE = new IgniteSpiException(e);
 
                 spiE.addSuppressed(e);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index 6bc1502..1509d74 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -203,13 +203,9 @@ public class GridNioServerWrapper {
     private volatile ThrowableSupplier<SocketChannel, IOException> socketChannelFactory = SocketChannel::open;
 
     /** Enable forcible node kill. */
-    private boolean enableForcibleNodeKill = IgniteSystemProperties
+    private boolean forcibleNodeKillEnabled = IgniteSystemProperties
         .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
 
-    /** Enable troubleshooting logger. */
-    private boolean enableTroubleshootingLog = IgniteSystemProperties
-        .getBoolean(IgniteSystemProperties.IGNITE_TROUBLESHOOTING_LOGGER);
-
     /** NIO server. */
     private GridNioServer<Message> nioSrv;
 
@@ -647,12 +643,16 @@ public class GridNioServerWrapper {
         }
 
         if (ses == null) {
-            if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) {
-                if (node.isClient() && (addrs.size() - skippedAddrs == failedAddrsSet.size())) {
-                    String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet +
-                        "; inverse connection will be requested.";
-
-                    throw new NodeUnreachableException(msg);
+            // If local node and remote node are configured to use paired connections we won't even request
+            // inverse connection so no point in throwing NodeUnreachableException
+            if (!cfg.usePairedConnections() || !Boolean.TRUE.equals(node.attribute(attrs.pairedConnection()))) {
+                if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) {
+                    if (node.isClient() && (addrs.size() - skippedAddrs == failedAddrsSet.size())) {
+                        String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet +
+                            "; inverse connection will be requested.";
+
+                        throw new NodeUnreachableException(msg);
+                    }
                 }
             }
 
@@ -768,25 +768,14 @@ public class GridNioServerWrapper {
             ctx.resolveCommunicationFailure(node, errs);
         }
 
-        if (!commErrResolve && enableForcibleNodeKill) {
+        if (!commErrResolve && forcibleNodeKillEnabled) {
             if (ctx.node(node.id()) != null
                 && node.isClient()
                 && !locNodeSupplier.get().isClient()
                 && isRecoverableException(errs)
             ) {
-                // Only server can fail client for now, as in TcpDiscovery resolveCommunicationFailure() is not supported.
-                String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
-                    "cluster [" + "rmtNode=" + node + ']';
-
-                if (enableTroubleshootingLog)
-                    U.error(log, msg, errs);
-                else
-                    U.warn(log, msg);
-
-                ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
-                    "rmtNode=" + node +
-                    ", errs=" + errs +
-                    ", connectErrs=" + X.getSuppressedList(errs) + ']');
+                CommunicationTcpUtils.failNode(node,
+                    ctx, errs, log);
             }
         }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java
index cbf4777..7221d52 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java
@@ -29,11 +29,14 @@ import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -41,6 +44,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
@@ -50,6 +54,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assume;
 import org.junit.Test;
@@ -299,13 +304,29 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC
 
         IgniteEx srv = grid(SRVS_NUM - 1);
 
-        // We need to interrupt communication worker client nodes so that
-        // closed connection won't automatically reopen when we don't expect it.
-        // Server communication worker is interrupted for another reason - it can hang the test
-        // due to bug in inverse connection protocol & comm worker - it will be fixed later.
+        interruptCommWorkerThreads(client.name());
+
+        TcpCommunicationSpi spi = (TcpCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+        GridTestUtils.invoke(spi, "onNodeLeft", clientNode.consistentId(), clientNode.id());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() ->
+            srv.context().io().sendIoTest(clientNode, new byte[10], false).get()
+        );
+
+        assertTrue(GridTestUtils.waitForCondition(fut::isDone, 30_000));
+
+        assertTrue(lsnr.check());
+    }
+
+    /**
+     * We need to interrupt communication worker client nodes so that
+     * closed connection won't automatically reopen when we don't expect it.
+     */
+    private void interruptCommWorkerThreads(String clientName) {
         List<Thread> tcpCommWorkerThreads = Thread.getAllStackTraces().keySet().stream()
             .filter(t -> t.getName().contains("tcp-comm-worker"))
-            .filter(t -> t.getName().contains(srv.name()) || t.getName().contains(client.name()))
+            .filter(t -> t.getName().contains(clientName))
             .collect(Collectors.toList());
 
         for (Thread tcpCommWorkerThread : tcpCommWorkerThreads) {
@@ -313,6 +334,38 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC
 
             U.join(tcpCommWorkerThread, log);
         }
+    }
+
+    /**
+     * Forcible node kill functionality is triggered in inverse connection request flow as well
+     * when a timeout for inverse connection is reached.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, value = "true")
+    public void testClientSkippingInverseConnResponseIsForciblyFailed() throws Exception {
+        UNREACHABLE_DESTINATION.set(UNRESOLVED_HOST);
+        RESPOND_TO_INVERSE_REQUEST.set(false);
+
+        AtomicBoolean clientFailedEventFlag = new AtomicBoolean(false);
+
+        IgniteEx srv = startGrid();
+
+        srv.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event event) {
+                clientFailedEventFlag.set(true);
+
+                return false;
+            }
+        }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+
+        forceClientToSrvConnections = false;
+
+        IgniteEx client = startClientGrid(1);
+        ClusterNode clientNode = client.localNode();
+
+        interruptCommWorkerThreads(client.name());
 
         TcpCommunicationSpi spi = (TcpCommunicationSpi)srv.configuration().getCommunicationSpi();
 
@@ -322,9 +375,7 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC
             srv.context().io().sendIoTest(clientNode, new byte[10], false).get()
         );
 
-        assertTrue(GridTestUtils.waitForCondition(fut::isDone, 30_000));
-
-        assertTrue(lsnr.check());
+        assertTrue(GridTestUtils.waitForCondition(clientFailedEventFlag::get, 10_000));
     }
 
     /**