You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/13 09:52:19 UTC
[25/28] ignite git commit: IGNITE-6818 Handle half open connection in
communication.
IGNITE-6818 Handle half open connection in communication.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/191295d4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/191295d4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/191295d4
Branch: refs/heads/ignite-zk
Commit: 191295d45f53225d9e1e214c6fdd85b59e80d0ec
Parents: 132ec3f
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Nov 13 10:35:21 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 13 10:35:21 2017 +0300
----------------------------------------------------------------------
.../communication/tcp/TcpCommunicationSpi.java | 37 +++--
...ommunicationSpiHalfOpenedConnectionTest.java | 142 +++++++++++++++++++
.../IgniteSpiCommunicationSelfTestSuite.java | 2 +
3 files changed, 168 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1bff8ee..49425ce 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -539,15 +539,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (c.failed) {
ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
- for (GridNioSession ses0 : nioSrvr.sessions()) {
- ConnectionKey key0 = ses0.meta(CONN_IDX_META);
-
- if (ses0.accepted() && key0 != null &&
- key0.nodeId().equals(connKey.nodeId()) &&
- key0.connectionIndex() == connKey.connectionIndex() &&
- key0.connectCount() < connKey.connectCount())
- ses0.close();
- }
+ closeStaleConnections(connKey);
}
}
}
@@ -567,11 +559,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (oldClient instanceof GridTcpNioCommunicationClient) {
if (log.isInfoEnabled())
log.info("Received incoming connection when already connected " +
- "to this node, rejecting [locNode=" + locNode.id() +
- ", rmtNode=" + sndId + ']');
+ "to this node, rejecting [locNode=" + locNode.id() +
+ ", rmtNode=" + sndId + ']');
ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
+ closeStaleConnections(connKey);
+
return;
}
else {
@@ -599,11 +593,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (log.isInfoEnabled())
log.info("Received incoming connection when already connected " +
- "to this node, rejecting [locNode=" + locNode.id() +
- ", rmtNode=" + sndId + ']');
+ "to this node, rejecting [locNode=" + locNode.id() +
+ ", rmtNode=" + sndId + ']');
ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
+ closeStaleConnections(connKey);
+
fut.onDone(oldClient);
return;
@@ -658,6 +654,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
+ /**
+ * @param connKey Connection key.
+ */
+ private void closeStaleConnections(ConnectionKey connKey) {
+ for (GridNioSession ses0 : nioSrvr.sessions()) {
+ ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+ if (ses0.accepted() && key0 != null &&
+ key0.nodeId().equals(connKey.nodeId()) &&
+ key0.connectionIndex() == connKey.connectionIndex() &&
+ key0.connectCount() < connKey.connectCount())
+ ses0.close();
+ }
+ }
+
@Override public void onMessage(final GridNioSession ses, Message msg) {
ConnectionKey connKey = ses.meta(CONN_IDX_META);
http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
new file mode 100644
index 0000000..3e10f94
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests case when connection is closed only for one side, when other is not notified.
+ */
+public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstractTest {
+ /** Client spi. */
+ private TcpCommunicationSpi clientSpi;
+
+ /** Paired connections. */
+ private boolean pairedConnections;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (igniteInstanceName.contains("client")) {
+ cfg.setClientMode(true);
+
+ clientSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+ }
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(pairedConnections);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnect() throws Exception {
+ pairedConnections = false;
+
+ checkReconnect();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectPaired() throws Exception {
+ pairedConnections = true;
+
+ checkReconnect();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkReconnect() throws Exception {
+ Ignite srv = startGrid("server");
+ Ignite client = startGrid("client");
+
+ UUID nodeId = srv.cluster().localNode().id();
+
+ System.out.println(">> Server ID: " + nodeId);
+
+ ClusterGroup srvGrp = client.cluster().forNodeId(nodeId);
+
+ System.out.println(">> Send job");
+
+ // Establish connection
+ client.compute(srvGrp).run(F.noop());
+
+ ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(clientSpi, "clients");
+ ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = U.field(clientSpi, "recoveryDescs");
+ ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = U.field(clientSpi, "outRecDescs");
+ ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = U.field(clientSpi, "inRecDescs");
+ GridNioServerListener<Message> lsnr = U.field(clientSpi, "srvLsnr");
+
+ Iterator<GridNioRecoveryDescriptor> it = F.concat(
+ recoveryDescs.values().iterator(),
+ outRecDescs.values().iterator(),
+ inRecDescs.values().iterator()
+ );
+
+ while (it.hasNext()) {
+ GridNioRecoveryDescriptor desc = it.next();
+
+ // Need to simulate connection close in GridNioServer as it
+ // releases descriptors on disconnect.
+ desc.release();
+ }
+
+ // Remove client to avoid calling close(), in that case server
+ // will close connection too, but we want to keep the server
+ // uninformed and force ping old connection.
+ GridCommunicationClient[] clients0 = clients.remove(nodeId);
+
+ for (GridCommunicationClient commClient : clients0)
+ lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new IOException("Test exception"));
+
+ info(">> Removed client");
+
+ // Reestablish connection
+ client.compute(srvGrp).run(F.noop());
+
+ info(">> Sent second job");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 30_000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 77de3fc..8e96a3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -38,6 +38,7 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpSelfTes
import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAckClosureSelfTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
/**
* Test suite for all communication SPIs.
@@ -78,6 +79,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpCommunicationSpiFaultyClientTest.class));
suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class));
+ suite.addTest(new TestSuite(TcpCommunicationSpiHalfOpenedConnectionTest.class));
return suite;
}