You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/02/18 06:51:17 UTC

[ignite] branch master updated: IGNITE-14153 closing for stale outcoming connections implemented - Fixes #8788.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new bbae673  IGNITE-14153 closing for stale outcoming connections implemented - Fixes #8788.
bbae673 is described below

commit bbae6735f73675ca86c9d2061e03d53a8884bd3d
Author: Igor Belyakov <ig...@gmail.com>
AuthorDate: Thu Feb 18 09:48:11 2021 +0300

    IGNITE-14153 closing for stale outcoming connections implemented - Fixes #8788.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../tcp/internal/InboundConnectionHandler.java     |  2 +-
 ...cpCommunicationSpiHalfOpenedConnectionTest.java | 76 ++++++++++++++++------
 2 files changed, 56 insertions(+), 22 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
index 4bc69ae..9b7a6d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
@@ -688,7 +688,7 @@ public class InboundConnectionHandler extends GridNioServerListenerAdapter<Messa
         for (GridNioSession ses0 : nioSrvWrapper.nio().sessions()) {
             ConnectionKey key0 = ses0.meta(CONN_IDX_META);
 
-            if (ses0.accepted() && key0 != null &&
+            if (key0 != null &&
                 key0.nodeId().equals(connKey.nodeId()) &&
                 key0.connectionIndex() == connKey.connectionIndex() &&
                 key0.connectCount() < connKey.connectCount())
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
index a59b190..50cf26c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListener;
 import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -38,9 +39,6 @@ import org.junit.Test;
  * Tests case when connection is closed only for one side, when other is not notified.
  */
 public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstractTest {
-    /** Client spi. */
-    private TcpCommunicationSpi clientSpi;
-
     /** Paired connections. */
     private boolean pairedConnections;
 
@@ -48,9 +46,6 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        if (igniteInstanceName.contains("client"))
-            clientSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
-
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(pairedConnections);
 
         return cfg;
@@ -68,7 +63,7 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
     public void testReconnect() throws Exception {
         pairedConnections = false;
 
-        checkReconnect();
+        checkReconnect(false);
     }
 
     /**
@@ -78,32 +73,71 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
     public void testReconnectPaired() throws Exception {
         pairedConnections = true;
 
-        checkReconnect();
+        checkReconnect(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReverseReconnect() throws Exception {
+        pairedConnections = false;
+
+        checkReconnect(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReverseReconnectPaired() throws Exception {
+        pairedConnections = true;
+
+        checkReconnect(true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void checkReconnect() throws Exception {
-        Ignite srv = startGrid("server");
-        Ignite client = startClientGrid("client");
+    private void checkReconnect(boolean reverseReconnect) throws Exception {
+        Ignite srv = startGrid(0);
+        Ignite client = startClientGrid(1);
 
-        UUID nodeId = srv.cluster().localNode().id();
+        UUID srvNodeId = srv.cluster().localNode().id();
+        UUID clientNodeId = client.cluster().localNode().id();
 
-        System.out.println(">> Server ID: " + nodeId);
+        System.out.println(">> Server ID: " + srvNodeId);
+        System.out.println(">> Client ID: " + clientNodeId);
 
-        ClusterGroup srvGrp = client.cluster().forNodeId(nodeId);
+        ClusterGroup srvGrp = client.cluster().forNodeId(srvNodeId);
+        ClusterGroup clientGrp = srv.cluster().forNodeId(clientNodeId);
 
         System.out.println(">> Send job");
 
         // Establish connection
         client.compute(srvGrp).run(F.noop());
 
-        ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridTestUtils.getFieldValue(clientSpi, "clientPool", "clients");
-        ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = GridTestUtils.getFieldValue(clientSpi, "nioSrvWrapper", "recoveryDescs");
-        ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = GridTestUtils.getFieldValue(clientSpi, "nioSrvWrapper", "outRecDescs");
-        ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = GridTestUtils.getFieldValue(clientSpi, "nioSrvWrapper", "inRecDescs");
-        GridNioServerListener<Message> lsnr = GridTestUtils.getFieldValue(clientSpi, "nioSrvWrapper", "srvLsnr");
+        if (reverseReconnect)
+            reconnect(srv, client, clientGrp);
+        else
+            reconnect(client, srv, srvGrp);
+    }
+
+    /**
+     * Reconnects the {@code srcNode} to the {@code targetNode}.
+     *
+     * @param srcNode Source node.
+     * @param targetNode Target node.
+     * @param targetGrp Target cluster group.
+     */
+    private void reconnect(Ignite srcNode, Ignite targetNode, ClusterGroup targetGrp) {
+        CommunicationSpi commSpi = srcNode.configuration().getCommunicationSpi();
+
+        ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridTestUtils.getFieldValue(commSpi, "clientPool", "clients");
+        ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "recoveryDescs");
+        ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "outRecDescs");
+        ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "inRecDescs");
+        GridNioServerListener<Message> lsnr = GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "srvLsnr");
 
         Iterator<GridNioRecoveryDescriptor> it = F.concat(
             recoveryDescs.values().iterator(),
@@ -122,7 +156,7 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
         // Remove client to avoid calling close(), in that case server
         // will close connection too, but we want to keep the server
         // uninformed and force ping old connection.
-        GridCommunicationClient[] clients0 = clients.remove(nodeId);
+        GridCommunicationClient[] clients0 = clients.remove(targetNode.cluster().localNode().id());
 
         for (GridCommunicationClient commClient : clients0)
             lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new IOException("Test exception"));
@@ -130,7 +164,7 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
         info(">> Removed client");
 
         // Reestablish connection
-        client.compute(srvGrp).run(F.noop());
+        srcNode.compute(targetGrp).run(F.noop());
 
         info(">> Sent second job");
     }