You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/10/28 18:09:54 UTC

[qpid-dispatch] 03/03: DISPATCH-2261: close dispatcher out link on connector delete

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch 1.17.x
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 4930a80f6381918b7e4e5b16206973e2e53146b6
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Oct 26 09:05:12 2021 -0400

    DISPATCH-2261: close dispatcher out link on connector delete
    
    This closes #1398
---
 src/adaptors/tcp_adaptor.c        |  38 +++++---
 tests/system_test.py              |   2 +-
 tests/system_tests_tcp_adaptor.py | 187 +++++++++++++++++---------------------
 3 files changed, 111 insertions(+), 116 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 3e71611..57ab8bc 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -140,6 +140,7 @@ static void handle_disconnected(qdr_tcp_connection_t* conn);
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
 static void free_bridge_config(qd_tcp_bridge_t *config);
 static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
+static void detach_links(qdr_tcp_connection_t *tc);
 
 
 // is the incoming byte window full
@@ -195,6 +196,7 @@ static void on_activate(void *context)
     qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] on_activate", conn->conn_id);
     while (qdr_connection_process(conn->qdr_conn)) {}
     if (conn->egress_dispatcher && conn->connector_closed) {
+        detach_links(conn);
         qdr_connection_set_context(conn->qdr_conn, 0);
         qdr_connection_closed(conn->qdr_conn);
         conn->qdr_conn = 0;
@@ -473,25 +475,18 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
         qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
         qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream");
+        conn->instream = 0;
     }
     if (conn->outstream) {
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close outstream",
                conn->conn_id, conn->outgoing_id);
         qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream");
+        conn->outstream = 0;
     }
-    if (conn->incoming) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
-               "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming",
-               conn->conn_id, conn->incoming_id);
-        qdr_link_detach(conn->incoming, QD_LOST, 0);
-    }
-    if (conn->outgoing) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
-               "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing",
-               conn->conn_id, conn->outgoing_id);
-        qdr_link_detach(conn->outgoing, QD_LOST, 0);
-    }
+
+    detach_links(conn);
+
     if (conn->initial_delivery) {
         qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, false);
         qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery");
@@ -2060,3 +2055,22 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo
         free_qdr_tcp_connection(conn);
     }
 }
+
+
+static void detach_links(qdr_tcp_connection_t *conn)
+{
+    if (conn->incoming) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] detaching incoming link",
+               conn->conn_id, conn->incoming_id);
+        qdr_link_detach(conn->incoming, QD_LOST, 0);
+        conn->incoming = 0;
+    }
+    if (conn->outgoing) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] detaching outgoing link",
+               conn->conn_id, conn->outgoing_id);
+        qdr_link_detach(conn->outgoing, QD_LOST, 0);
+        conn->outgoing = 0;
+    }
+}
diff --git a/tests/system_test.py b/tests/system_test.py
index 08c96ae..0c45439 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -657,7 +657,7 @@ class Qdrouterd(Process):
 
         def check():
             addrs = self.management.query(a_type).get_dicts()
-            rc = [a for a in addrs if address in a['name']]
+            rc = [a for a in addrs if a['name'].endswith(address)]
             count = 0
             for a in rc:
                 count += a['subscriberCount']
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index fe26dd4..35cc1d7 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -20,6 +20,7 @@
 import io
 import json
 import os
+import socket
 import sys
 import time
 import traceback
@@ -996,39 +997,67 @@ class TcpAdaptorManagementTest(TestCase):
         cls.tcp_listener_port = cls.tester.get_port()
         cls.test_name = 'TCPMgmtTest'
 
-        # Here we have a simple barebones standalone router config.
-        config = [
-            ('router', {'mode': 'standalone',
-                        'id': cls.test_name}),
+        # create edge and interior routers.  The listener/connector will be on
+        # the edge router.  It is expected that the edge will create proxy
+        # links to the interior and remove them when the test is done.
+
+        cls.interior_edge_port = cls.tester.get_port()
+        cls.interior_mgmt_port = cls.tester.get_port()
+        cls.edge_mgmt_port = cls.tester.get_port()
+
+        cls.tcp_server_port = cls.tester.get_port()
+        cls.tcp_listener_port = cls.tester.get_port()
+
+        i_config = [
+            ('router', {'mode': 'interior',
+                        'id': 'TCPMgmtTestInterior'}),
             ('listener', {'role': 'normal',
-                          'port': cls.tester.get_port()}),
+                          'port': cls.interior_mgmt_port}),
+            ('listener', {'role': 'edge', 'port': cls.interior_edge_port}),
+            ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
         ]
-        config = Qdrouterd.Config(config)
-        cls.router = cls.tester.qdrouterd(cls.test_name, config, wait=True)
+        config = Qdrouterd.Config(i_config)
+        cls.i_router = cls.tester.qdrouterd('TCPMgmtTestInterior', config, wait=False)
 
-        # Start the echo server. This is the server that the tcpConnector
-        # will be connecting to.
-        server_prefix = "ECHO_SERVER ES_%s" % cls.test_name
-        parent_path = os.path.dirname(os.getcwd())
-        cls.logger = Logger(title="TcpAdaptor",
-                            print_to_console=True,
-                            save_for_dump=False,
-                            ofilename=os.path.join(parent_path, "setUpClass/TcpAdaptor_echo_server.log"))
-        cls.echo_server = TcpEchoServer(prefix=server_prefix,
-                                        port=cls.tcp_server_port,
-                                        logger=cls.logger)
-        # The router and the echo server are running at this point.
-        assert cls.echo_server.is_running
+        e_config = [
+            ('router', {'mode': 'edge',
+                        'id': 'TCPMgmtTestEdge'}),
+            ('listener', {'role': 'normal',
+                          'port': cls.edge_mgmt_port}),
+            ('connector', {'name': 'edge', 'role': 'edge',
+                           'port': cls.interior_edge_port}),
+            ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+        ]
+        config = Qdrouterd.Config(e_config)
+        cls.e_router = cls.tester.qdrouterd('TCPMgmtTestEdge', config,
+                                            wait=False)
+
+        cls.i_router.wait_ready()
+        cls.e_router.wait_ready()
+
+    def _query_links_by_addr(self, router_mgmt, owning_addr):
+        oid = 'org.apache.qpid.dispatch.router.link'
+        attrs = ['owningAddr', 'linkDir']
+
+        links = []
+        rc = router_mgmt.query(type=oid, attribute_names=attrs).results
+        for link in rc:
+            if link[0] is not None and link[0].endswith(owning_addr):
+                links.append(link)
+        return links
 
     @unittest.skipIf(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
     def test_01_mgmt(self):
         """
-        Create and delete TCP connectors and listeners
+        Create and delete TCP connectors and listeners. Ensure that the service
+        address is properly removed on the interior router.
         """
         LISTENER_TYPE = 'org.apache.qpid.dispatch.tcpListener'
         CONNECTOR_TYPE = 'org.apache.qpid.dispatch.tcpConnector'
 
-        mgmt = self.router.management
+        mgmt = self.e_router.management
 
         # When starting out, there should be no tcpListeners or tcpConnectors.
         self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
@@ -1052,93 +1081,45 @@ class TcpAdaptorManagementTest(TestCase):
         self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results))
         self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
 
-        # Give a second for the tcpListener to start listening.
-        time.sleep(1)
-        # Start the echo client runner
-        client_runner_timeout = 3
-        runner = EchoClientRunner(self.test_name, 1, self.logger,
-                                  None, None, 100, 1,
-                                  timeout=client_runner_timeout,
-                                  port_override=self.tcp_listener_port)
-        result = None
+        # now verify that the interior router sees the service address
+        # and two proxy links are created
+        self.i_router.wait_address(self.test_name, subscribers=1)
+        while True:
+            links = self._query_links_by_addr(self.i_router.management,
+                                              self.test_name)
+            if links:
+                # expect a single consumer link that represents
+                # the connector
+                self.assertEqual(1, len(links))
+                self.assertEqual("out", links[0][1])
+                break
+            time.sleep(0.25)
 
-        # Give some time for the client runner to finish up.
-        time.sleep(client_runner_timeout + 1)
-
-        # Make sure servers are still up
-        if self.echo_server.error:
-            self.logger.log(
-                "TCP_TEST %s Server %s stopped with error: %s" %
-                (self.test_name, self.echo_server.prefix,
-                 self.echo_server.error))
-            result = self.echo_server.error
-
-        if self.echo_server.exit_status:
-            self.logger.log(
-                "TCP_TEST %s Server %s stopped with status: %s" %
-                (self.test_name, self.echo_server.prefix, self.echo_server.exit_status))
-            result = self.echo_server.exit_status
-
-        self.assertIsNone(result)
-
-        error = runner.client_error()
-        if error is not None:
-            self.logger.log("TCP_TEST %s Client %s stopped with error: %s" %
-                            (self.test_name, runner.name, error))
-
-        self.assertIsNone(error)
-        status = runner.client_exit_status()
-        if status is not None:
-            self.logger.log("TCP_TEST %s Client %s stopped with status: %s" %
-                            (self.test_name, runner.name, status))
-        self.assertIsNone(status)
-        self.assertFalse(runner.client_running())
-
-        # Delete the connector and make sure the echo client fails.
+        # Delete the connector and listener
         out = mgmt.delete(type=CONNECTOR_TYPE, name=connector_name)
         self.assertIsNone(out)
-
-        # Give some time for the connector to be deleted by the router.
-        # Deleting a connector also involves deleting existing connections
-        # that were made using the details from the connector.
-        # In this case, the router would have to drop the connection it
-        # already made to the echo server, so let's give it some time to
-        # do that.
-        time.sleep(2)
-
-        client_runner_timeout = 2
-        # Start the echo client runner
-        runner = EchoClientRunner(self.test_name, 1, self.logger,
-                                  None, None, 100, 1,
-                                  # Try for 2 seconds before timing out
-                                  timeout=client_runner_timeout,
-                                  port_override=self.tcp_listener_port)
-        time.sleep(client_runner_timeout + 1)
-        exit_status = runner.client_exit_status()
-
-        if exit_status is not None:
-            # The test is a success, the echo client sender timed out
-            # because it did not receive anything back from the
-            # echo server because the connector to the echo server
-            # got deleted
-            self.logger.log("TCP_TEST %s Client %s timedout with error: %s" %
-                            (self.test_name, runner.name, exit_status))
-        else:
-            self.logger.log("ERROR: Connector not deleted")
-        self.assertIsNotNone(exit_status)
-
-        # Now delete the tcpListener
+        self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
         out = mgmt.delete(type=LISTENER_TYPE, name=listener_name)
         self.assertIsNone(out)
+        self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
 
-        runner = EchoClientRunner(self.test_name, 1, self.logger,
-                                  None, None, 100, 1,
-                                  # Try for 2 seconds before timing out
-                                  timeout=client_runner_timeout,
-                                  port_override=self.tcp_listener_port)
-        time.sleep(client_runner_timeout + 1)
-        error = runner.client_error()
-        self.assertIn("ConnectionRefusedError", error)
+        # verify the service address and proxy links are no longer active on
+        # the interior router
+        self.i_router.wait_address_unsubscribed(self.test_name)
+        while True:
+            links = self._query_links_by_addr(self.i_router.management,
+                                              self.test_name)
+            if len(links) == 0:
+                break
+            time.sleep(0.25)
+
+        # verify that clients can no longer connect to the listener
+        client_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        client_conn.setblocking(True)
+        client_conn.settimeout(5)
+        with self.assertRaises(ConnectionRefusedError):
+            client_conn.connect(('127.0.0.1', self.tcp_listener_port))
+        client_conn.close()
 
 
 if __name__ == '__main__':

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org