You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/02/18 06:51:17 UTC
[ignite] branch master updated: IGNITE-14153 closing for stale
outcoming connections implemented - Fixes #8788.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new bbae673 IGNITE-14153 closing for stale outcoming connections implemented - Fixes #8788.
bbae673 is described below
commit bbae6735f73675ca86c9d2061e03d53a8884bd3d
Author: Igor Belyakov <ig...@gmail.com>
AuthorDate: Thu Feb 18 09:48:11 2021 +0300
IGNITE-14153 closing for stale outcoming connections implemented - Fixes #8788.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../tcp/internal/InboundConnectionHandler.java | 2 +-
...cpCommunicationSpiHalfOpenedConnectionTest.java | 76 ++++++++++++++++------
2 files changed, 56 insertions(+), 22 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
index 4bc69ae..9b7a6d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
@@ -688,7 +688,7 @@ public class InboundConnectionHandler extends GridNioServerListenerAdapter<Messa
for (GridNioSession ses0 : nioSrvWrapper.nio().sessions()) {
ConnectionKey key0 = ses0.meta(CONN_IDX_META);
- if (ses0.accepted() && key0 != null &&
+ if (key0 != null &&
key0.nodeId().equals(connKey.nodeId()) &&
key0.connectionIndex() == connKey.connectionIndex() &&
key0.connectCount() < connKey.connectCount())
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
index a59b190..50cf26c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -38,9 +39,6 @@ import org.junit.Test;
* Tests case when connection is closed only for one side, when other is not notified.
*/
public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstractTest {
- /** Client spi. */
- private TcpCommunicationSpi clientSpi;
-
/** Paired connections. */
private boolean pairedConnections;
@@ -48,9 +46,6 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- if (igniteInstanceName.contains("client"))
- clientSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
-
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(pairedConnections);
return cfg;
@@ -68,7 +63,7 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
public void testReconnect() throws Exception {
pairedConnections = false;
- checkReconnect();
+ checkReconnect(false);
}
/**
@@ -78,32 +73,71 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
public void testReconnectPaired() throws Exception {
pairedConnections = true;
- checkReconnect();
+ checkReconnect(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReverseReconnect() throws Exception {
+ pairedConnections = false;
+
+ checkReconnect(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReverseReconnectPaired() throws Exception {
+ pairedConnections = true;
+
+ checkReconnect(true);
}
/**
* @throws Exception If failed.
*/
- private void checkReconnect() throws Exception {
- Ignite srv = startGrid("server");
- Ignite client = startClientGrid("client");
+ private void checkReconnect(boolean reverseReconnect) throws Exception {
+ Ignite srv = startGrid(0);
+ Ignite client = startClientGrid(1);
- UUID nodeId = srv.cluster().localNode().id();
+ UUID srvNodeId = srv.cluster().localNode().id();
+ UUID clientNodeId = client.cluster().localNode().id();
- System.out.println(">> Server ID: " + nodeId);
+ System.out.println(">> Server ID: " + srvNodeId);
+ System.out.println(">> Client ID: " + clientNodeId);
- ClusterGroup srvGrp = client.cluster().forNodeId(nodeId);
+ ClusterGroup srvGrp = client.cluster().forNodeId(srvNodeId);
+ ClusterGroup clientGrp = srv.cluster().forNodeId(clientNodeId);
System.out.println(">> Send job");
// Establish connection
client.compute(srvGrp).run(F.noop());
- ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridTestUtils.getFieldValue(clientSpi, "clientPool", "clients");
- ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = GridTestUtils.getFieldValue(clientSpi, "nioSrvWrapper", "recoveryDescs");
- ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = GridTestUtils.getFieldValue(clientSpi, "nioSrvWrapper", "outRecDescs");
- ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = GridTestUtils.getFieldValue(clientSpi, "nioSrvWrapper", "inRecDescs");
- GridNioServerListener<Message> lsnr = GridTestUtils.getFieldValue(clientSpi, "nioSrvWrapper", "srvLsnr");
+ if (reverseReconnect)
+ reconnect(srv, client, clientGrp);
+ else
+ reconnect(client, srv, srvGrp);
+ }
+
+ /**
+ * Reconnects the {@code srcNode} to the {@code targetNode}.
+ *
+ * @param srcNode Source node.
+ * @param targetNode Target node.
+ * @param targetGrp Target cluster group.
+ */
+ private void reconnect(Ignite srcNode, Ignite targetNode, ClusterGroup targetGrp) {
+ CommunicationSpi commSpi = srcNode.configuration().getCommunicationSpi();
+
+ ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridTestUtils.getFieldValue(commSpi, "clientPool", "clients");
+ ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "recoveryDescs");
+ ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "outRecDescs");
+ ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "inRecDescs");
+ GridNioServerListener<Message> lsnr = GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "srvLsnr");
Iterator<GridNioRecoveryDescriptor> it = F.concat(
recoveryDescs.values().iterator(),
@@ -122,7 +156,7 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
// Remove client to avoid calling close(), in that case server
// will close connection too, but we want to keep the server
// uninformed and force ping old connection.
- GridCommunicationClient[] clients0 = clients.remove(nodeId);
+ GridCommunicationClient[] clients0 = clients.remove(targetNode.cluster().localNode().id());
for (GridCommunicationClient commClient : clients0)
lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new IOException("Test exception"));
@@ -130,7 +164,7 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstr
info(">> Removed client");
// Reestablish connection
- client.compute(srvGrp).run(F.noop());
+ srcNode.compute(targetGrp).run(F.noop());
info(">> Sent second job");
}