You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/13 09:52:19 UTC

[25/28] ignite git commit: IGNITE-6818 Handle half open connection in communication.

IGNITE-6818 Handle half open connection in communication.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/191295d4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/191295d4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/191295d4

Branch: refs/heads/ignite-zk
Commit: 191295d45f53225d9e1e214c6fdd85b59e80d0ec
Parents: 132ec3f
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Nov 13 10:35:21 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 13 10:35:21 2017 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  |  37 +++--
 ...ommunicationSpiHalfOpenedConnectionTest.java | 142 +++++++++++++++++++
 .../IgniteSpiCommunicationSelfTestSuite.java    |   2 +
 3 files changed, 168 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1bff8ee..49425ce 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -539,15 +539,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         if (c.failed) {
                             ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
 
-                            for (GridNioSession ses0 : nioSrvr.sessions()) {
-                                ConnectionKey key0 = ses0.meta(CONN_IDX_META);
-
-                                if (ses0.accepted() && key0 != null &&
-                                    key0.nodeId().equals(connKey.nodeId()) &&
-                                    key0.connectionIndex() == connKey.connectionIndex() &&
-                                    key0.connectCount() < connKey.connectCount())
-                                    ses0.close();
-                            }
+                            closeStaleConnections(connKey);
                         }
                     }
                 }
@@ -567,11 +559,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         if (oldClient instanceof GridTcpNioCommunicationClient) {
                             if (log.isInfoEnabled())
                                 log.info("Received incoming connection when already connected " +
-                                "to this node, rejecting [locNode=" + locNode.id() +
-                                ", rmtNode=" + sndId + ']');
+                                    "to this node, rejecting [locNode=" + locNode.id() +
+                                    ", rmtNode=" + sndId + ']');
 
                             ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
 
+                            closeStaleConnections(connKey);
+
                             return;
                         }
                         else {
@@ -599,11 +593,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                                 if (log.isInfoEnabled())
                                     log.info("Received incoming connection when already connected " +
-                                    "to this node, rejecting [locNode=" + locNode.id() +
-                                    ", rmtNode=" + sndId + ']');
+                                        "to this node, rejecting [locNode=" + locNode.id() +
+                                        ", rmtNode=" + sndId + ']');
 
                                 ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
 
+                                closeStaleConnections(connKey);
+
                                 fut.onDone(oldClient);
 
                                 return;
@@ -658,6 +654,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 }
             }
 
+            /**
+             * @param connKey Connection key.
+             */
+            private void closeStaleConnections(ConnectionKey connKey) {
+                for (GridNioSession ses0 : nioSrvr.sessions()) {
+                    ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+                    if (ses0.accepted() && key0 != null &&
+                        key0.nodeId().equals(connKey.nodeId()) &&
+                        key0.connectionIndex() == connKey.connectionIndex() &&
+                        key0.connectCount() < connKey.connectCount())
+                        ses0.close();
+                }
+            }
+
             @Override public void onMessage(final GridNioSession ses, Message msg) {
                 ConnectionKey connKey = ses.meta(CONN_IDX_META);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
new file mode 100644
index 0000000..3e10f94
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests case when connection is closed only for one side, when other is not notified.
+ */
+public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstractTest {
+    /** Client spi. */
+    private TcpCommunicationSpi clientSpi;
+
+    /** Paired connections. */
+    private boolean pairedConnections;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (igniteInstanceName.contains("client")) {
+            cfg.setClientMode(true);
+
+            clientSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+        }
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(pairedConnections);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        pairedConnections = false;
+
+        checkReconnect();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectPaired() throws Exception {
+        pairedConnections = true;
+
+        checkReconnect();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkReconnect() throws Exception {
+        Ignite srv = startGrid("server");
+        Ignite client = startGrid("client");
+
+        UUID nodeId = srv.cluster().localNode().id();
+
+        System.out.println(">> Server ID: " + nodeId);
+
+        ClusterGroup srvGrp = client.cluster().forNodeId(nodeId);
+
+        System.out.println(">> Send job");
+
+        // Establish connection
+        client.compute(srvGrp).run(F.noop());
+
+        ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(clientSpi, "clients");
+        ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = U.field(clientSpi, "recoveryDescs");
+        ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = U.field(clientSpi, "outRecDescs");
+        ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = U.field(clientSpi, "inRecDescs");
+        GridNioServerListener<Message> lsnr = U.field(clientSpi, "srvLsnr");
+
+        Iterator<GridNioRecoveryDescriptor> it = F.concat(
+            recoveryDescs.values().iterator(),
+            outRecDescs.values().iterator(),
+            inRecDescs.values().iterator()
+        );
+
+        while (it.hasNext()) {
+            GridNioRecoveryDescriptor desc = it.next();
+
+            // Need to simulate connection close in GridNioServer as it
+            // releases descriptors on disconnect.
+            desc.release();
+        }
+
+        // Remove client to avoid calling close(), in that case server
+        // will close connection too, but we want to keep the server
+        // uninformed and force ping old connection.
+        GridCommunicationClient[] clients0 = clients.remove(nodeId);
+
+        for (GridCommunicationClient commClient : clients0)
+            lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new IOException("Test exception"));
+
+        info(">> Removed client");
+
+        // Reestablish connection
+        client.compute(srvGrp).run(F.noop());
+
+        info(">> Sent second job");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 30_000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 77de3fc..8e96a3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -38,6 +38,7 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpSelfTes
 import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAckClosureSelfTest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
 
 /**
  * Test suite for all communication SPIs.
@@ -78,6 +79,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(TcpCommunicationSpiFaultyClientTest.class));
         suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class));
+        suite.addTest(new TestSuite(TcpCommunicationSpiHalfOpenedConnectionTest.class));
 
         return suite;
     }