You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/10/28 18:09:51 UTC

[qpid-dispatch] branch 1.17.x updated (c949a77 -> 4930a80)

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a change to branch 1.17.x
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from c949a77  DISPATCH-2259: use hostname when setting connection hostname
     new 1dfc140  DISPATCH-2260: HTTP/1.x: fix deletion of httpConnector and httpListener
     new fd44505  DISPATCH-2264: Fix for router crash when deleting listener
     new 4930a80  DISPATCH-2261: close dispatcher out link on connector delete

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/qpid/dispatch/protocol_adaptor.h |  12 ++
 src/adaptors/http1/http1_adaptor.c       |  19 +--
 src/adaptors/http1/http1_client.c        |  26 +++-
 src/adaptors/http1/http1_private.h       |   6 +-
 src/adaptors/http1/http1_server.c        |  73 +++++++-----
 src/adaptors/http2/http2_adaptor.c       |  37 ++++--
 src/adaptors/tcp_adaptor.c               |  38 ++++--
 src/router_core/connections.c            |   6 +-
 src/router_core/router_core_private.h    |  15 +--
 tests/system_test.py                     |   2 +-
 tests/system_tests_http1_adaptor.py      | 197 ++++++++++++++++++++++---------
 tests/system_tests_http2.py              |  54 ++++++++-
 tests/system_tests_tcp_adaptor.py        | 187 +++++++++++++----------------
 13 files changed, 425 insertions(+), 247 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/03: DISPATCH-2264: Fix for router crash when deleting listener

Posted by kg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch 1.17.x
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit fd44505ca630e30ddca055723b2c80dd6ff8b065
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Wed Oct 27 12:07:11 2021 -0400

    DISPATCH-2264: Fix for router crash when deleting listener
---
 src/adaptors/http2/http2_adaptor.c | 37 +++++++++++++++++---------
 tests/system_tests_http2.py        | 54 ++++++++++++++++++++++++++++++++++++--
 2 files changed, 77 insertions(+), 14 deletions(-)

diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index 19ed791..0ff8e10 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -2648,9 +2648,25 @@ static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *c
         }
         break;
 
-        case PN_LISTENER_CLOSE:
-            qd_log(log, QD_LOG_INFO, "Closing HTTP connection on %s", host_port);
-            break;
+        case PN_LISTENER_CLOSE: {
+            if (li->pn_listener) {
+                pn_condition_t *cond = pn_listener_condition(li->pn_listener);
+                if (pn_condition_is_set(cond)) {
+                    qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port, pn_condition_get_description(cond), pn_condition_get_name(cond));
+                }
+                else {
+                    qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
+                }
+            }
+
+            sys_mutex_lock(http2_adaptor->lock);
+            pn_listener_set_context(li->pn_listener, 0);
+            li->pn_listener = 0;
+            DEQ_REMOVE(http2_adaptor->listeners, li);
+            sys_mutex_unlock(http2_adaptor->lock);
+            qd_http_listener_decref(li);
+        }
+        break;
 
         default:
             break;
@@ -2682,18 +2698,13 @@ void qd_http2_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *connector
  */
 void qd_http2_delete_listener(qd_dispatch_t *qd, qd_http_listener_t *li)
 {
+    sys_mutex_lock(http2_adaptor->lock);
     if (li) {
         if (li->pn_listener) {
             pn_listener_close(li->pn_listener);
-            li->pn_listener = 0;
         }
-        sys_mutex_lock(http2_adaptor->lock);
-        DEQ_REMOVE(http2_adaptor->listeners, li);
-        sys_mutex_unlock(http2_adaptor->lock);
-
-        qd_log(http2_adaptor->log_source, QD_LOG_INFO, "Deleted HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
-        qd_http_listener_decref(li);
     }
+    sys_mutex_unlock(http2_adaptor->lock);
 }
 
 
@@ -2750,14 +2761,16 @@ static void qdr_http2_adaptor_final(void *adaptor_context)
     // Free all http listeners
     qd_http_listener_t *li = DEQ_HEAD(adaptor->listeners);
     while (li) {
-        qd_http2_delete_listener(0, li);
+        DEQ_REMOVE_HEAD(adaptor->listeners);
+        qd_http_listener_decref(li);
         li = DEQ_HEAD(adaptor->listeners);
     }
 
     // Free all http connectors
     qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors);
     while (ct) {
-        qd_http2_delete_connector(0, ct);
+        DEQ_REMOVE_HEAD(adaptor->connectors);
+        qd_http_connector_decref(ct);
         ct = DEQ_HEAD(adaptor->connectors);
     }
 
diff --git a/tests/system_tests_http2.py b/tests/system_tests_http2.py
index 7d09f6f..e09c494 100644
--- a/tests/system_tests_http2.py
+++ b/tests/system_tests_http2.py
@@ -218,6 +218,43 @@ class CommonHttp2Tests:
         digest_of_response_file = get_digest(self.router_qdra.outdir + image_file_name)
         self.assertEqual(digest_of_server_file, digest_of_response_file)
 
+    def check_listener_delete(self, client_addr, server_addr):
+        # Run curl 127.0.0.1:port --http2-prior-knowledge
+        # We are first making sure that the http request goes thru successfully.
+        out = self.run_curl(client_addr)
+        ret_string = ""
+        i = 0
+        while (i < 1000):
+            ret_string += str(i) + ","
+            i += 1
+        self.assertIn(ret_string, out)
+
+        qd_manager = QdManager(self, address=server_addr)
+        http_listeners = qd_manager.query('org.apache.qpid.dispatch.httpListener')
+        self.assertEqual(len(http_listeners), 1)
+
+        # Run a qdmanage DELETE on the httpListener
+        qd_manager.delete("org.apache.qpid.dispatch.httpListener", name=self.listener_name)
+
+        # Make sure the listener is gone
+        http_listeners  = qd_manager.query('org.apache.qpid.dispatch.httpListener')
+        self.assertEqual(len(http_listeners), 0)
+
+        # Try running a curl command against the listener to make sure it times out
+        request_timed_out = False
+        try:
+            out = self.run_curl(client_addr, timeout=3)
+        except Exception as e:
+            request_timed_out = True
+        self.assertTrue(request_timed_out)
+
+        # Add back the listener and run a curl command to make sure that the newly added listener is
+        # back up and running.
+        create_result = qd_manager.create("org.apache.qpid.dispatch.httpListener", self.http_listener_props)
+        sleep(2)
+        out = self.run_curl(client_addr)
+        self.assertIn(ret_string, out)
+
     def check_connector_delete(self, client_addr, server_addr):
         # Run curl 127.0.0.1:port --http2-prior-knowledge
         # We are first making sure that the http request goes thru successfully.
@@ -468,12 +505,20 @@ class Http2TestTwoRouter(Http2TestBase, CommonHttp2Tests):
                                                   server_file="http2_server.py")
         name = "http2-test-router"
         inter_router_port = cls.tester.get_port()
+        cls.http_listener_port = cls.tester.get_port()
+        cls.listener_name = 'listenerToBeDeleted'
+        cls.http_listener_props = {
+            'port': cls.http_listener_port,
+            'address': 'examples',
+            'host': '127.0.0.1',
+            'protocolVersion': 'HTTP2',
+            'name': cls.listener_name
+        }
 
         config_qdra = Qdrouterd.Config([
             ('router', {'mode': 'interior', 'id': 'QDR.A'}),
             ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
-            ('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
-                              'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
+            ('httpListener', cls.http_listener_props),
             ('listener', {'role': 'inter-router', 'port': inter_router_port})
         ])
 
@@ -553,6 +598,11 @@ class Http2TestTwoRouter(Http2TestBase, CommonHttp2Tests):
         self.assertEqual(stats_b[0].get('bytesIn'), 3944)
 
     @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
+    def test_yyy_http_listener_delete(self):
+        self.check_listener_delete(client_addr=self.router_qdra.http_addresses[0],
+                                   server_addr=self.router_qdra.addresses[0])
+
+    @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
     def test_zzz_http_connector_delete(self):
         self.check_connector_delete(client_addr=self.router_qdra.http_addresses[0],
                                     server_addr=self.router_qdrb.addresses[0])

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 03/03: DISPATCH-2261: close dispatcher out link on connector delete

Posted by kg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch 1.17.x
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 4930a80f6381918b7e4e5b16206973e2e53146b6
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Oct 26 09:05:12 2021 -0400

    DISPATCH-2261: close dispatcher out link on connector delete
    
    This closes #1398
---
 src/adaptors/tcp_adaptor.c        |  38 +++++---
 tests/system_test.py              |   2 +-
 tests/system_tests_tcp_adaptor.py | 187 +++++++++++++++++---------------------
 3 files changed, 111 insertions(+), 116 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 3e71611..57ab8bc 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -140,6 +140,7 @@ static void handle_disconnected(qdr_tcp_connection_t* conn);
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
 static void free_bridge_config(qd_tcp_bridge_t *config);
 static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
+static void detach_links(qdr_tcp_connection_t *tc);
 
 
 // is the incoming byte window full
@@ -195,6 +196,7 @@ static void on_activate(void *context)
     qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] on_activate", conn->conn_id);
     while (qdr_connection_process(conn->qdr_conn)) {}
     if (conn->egress_dispatcher && conn->connector_closed) {
+        detach_links(conn);
         qdr_connection_set_context(conn->qdr_conn, 0);
         qdr_connection_closed(conn->qdr_conn);
         conn->qdr_conn = 0;
@@ -473,25 +475,18 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
         qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
         qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream");
+        conn->instream = 0;
     }
     if (conn->outstream) {
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close outstream",
                conn->conn_id, conn->outgoing_id);
         qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream");
+        conn->outstream = 0;
     }
-    if (conn->incoming) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
-               "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming",
-               conn->conn_id, conn->incoming_id);
-        qdr_link_detach(conn->incoming, QD_LOST, 0);
-    }
-    if (conn->outgoing) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
-               "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing",
-               conn->conn_id, conn->outgoing_id);
-        qdr_link_detach(conn->outgoing, QD_LOST, 0);
-    }
+
+    detach_links(conn);
+
     if (conn->initial_delivery) {
         qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, false);
         qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery");
@@ -2060,3 +2055,22 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo
         free_qdr_tcp_connection(conn);
     }
 }
+
+
+static void detach_links(qdr_tcp_connection_t *conn)
+{
+    if (conn->incoming) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] detaching incoming link",
+               conn->conn_id, conn->incoming_id);
+        qdr_link_detach(conn->incoming, QD_LOST, 0);
+        conn->incoming = 0;
+    }
+    if (conn->outgoing) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] detaching outgoing link",
+               conn->conn_id, conn->outgoing_id);
+        qdr_link_detach(conn->outgoing, QD_LOST, 0);
+        conn->outgoing = 0;
+    }
+}
diff --git a/tests/system_test.py b/tests/system_test.py
index 08c96ae..0c45439 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -657,7 +657,7 @@ class Qdrouterd(Process):
 
         def check():
             addrs = self.management.query(a_type).get_dicts()
-            rc = [a for a in addrs if address in a['name']]
+            rc = [a for a in addrs if a['name'].endswith(address)]
             count = 0
             for a in rc:
                 count += a['subscriberCount']
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index fe26dd4..35cc1d7 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -20,6 +20,7 @@
 import io
 import json
 import os
+import socket
 import sys
 import time
 import traceback
@@ -996,39 +997,67 @@ class TcpAdaptorManagementTest(TestCase):
         cls.tcp_listener_port = cls.tester.get_port()
         cls.test_name = 'TCPMgmtTest'
 
-        # Here we have a simple barebones standalone router config.
-        config = [
-            ('router', {'mode': 'standalone',
-                        'id': cls.test_name}),
+        # create edge and interior routers.  The listener/connector will be on
+        # the edge router.  It is expected that the edge will create proxy
+        # links to the interior and remove them when the test is done.
+
+        cls.interior_edge_port = cls.tester.get_port()
+        cls.interior_mgmt_port = cls.tester.get_port()
+        cls.edge_mgmt_port = cls.tester.get_port()
+
+        cls.tcp_server_port = cls.tester.get_port()
+        cls.tcp_listener_port = cls.tester.get_port()
+
+        i_config = [
+            ('router', {'mode': 'interior',
+                        'id': 'TCPMgmtTestInterior'}),
             ('listener', {'role': 'normal',
-                          'port': cls.tester.get_port()}),
+                          'port': cls.interior_mgmt_port}),
+            ('listener', {'role': 'edge', 'port': cls.interior_edge_port}),
+            ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
         ]
-        config = Qdrouterd.Config(config)
-        cls.router = cls.tester.qdrouterd(cls.test_name, config, wait=True)
+        config = Qdrouterd.Config(i_config)
+        cls.i_router = cls.tester.qdrouterd('TCPMgmtTestInterior', config, wait=False)
 
-        # Start the echo server. This is the server that the tcpConnector
-        # will be connecting to.
-        server_prefix = "ECHO_SERVER ES_%s" % cls.test_name
-        parent_path = os.path.dirname(os.getcwd())
-        cls.logger = Logger(title="TcpAdaptor",
-                            print_to_console=True,
-                            save_for_dump=False,
-                            ofilename=os.path.join(parent_path, "setUpClass/TcpAdaptor_echo_server.log"))
-        cls.echo_server = TcpEchoServer(prefix=server_prefix,
-                                        port=cls.tcp_server_port,
-                                        logger=cls.logger)
-        # The router and the echo server are running at this point.
-        assert cls.echo_server.is_running
+        e_config = [
+            ('router', {'mode': 'edge',
+                        'id': 'TCPMgmtTestEdge'}),
+            ('listener', {'role': 'normal',
+                          'port': cls.edge_mgmt_port}),
+            ('connector', {'name': 'edge', 'role': 'edge',
+                           'port': cls.interior_edge_port}),
+            ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+        ]
+        config = Qdrouterd.Config(e_config)
+        cls.e_router = cls.tester.qdrouterd('TCPMgmtTestEdge', config,
+                                            wait=False)
+
+        cls.i_router.wait_ready()
+        cls.e_router.wait_ready()
+
+    def _query_links_by_addr(self, router_mgmt, owning_addr):
+        oid = 'org.apache.qpid.dispatch.router.link'
+        attrs = ['owningAddr', 'linkDir']
+
+        links = []
+        rc = router_mgmt.query(type=oid, attribute_names=attrs).results
+        for link in rc:
+            if link[0] is not None and link[0].endswith(owning_addr):
+                links.append(link)
+        return links
 
     @unittest.skipIf(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
     def test_01_mgmt(self):
         """
-        Create and delete TCP connectors and listeners
+        Create and delete TCP connectors and listeners. Ensure that the service
+        address is properly removed on the interior router.
         """
         LISTENER_TYPE = 'org.apache.qpid.dispatch.tcpListener'
         CONNECTOR_TYPE = 'org.apache.qpid.dispatch.tcpConnector'
 
-        mgmt = self.router.management
+        mgmt = self.e_router.management
 
         # When starting out, there should be no tcpListeners or tcpConnectors.
         self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
@@ -1052,93 +1081,45 @@ class TcpAdaptorManagementTest(TestCase):
         self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results))
         self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
 
-        # Give a second for the tcpListener to start listening.
-        time.sleep(1)
-        # Start the echo client runner
-        client_runner_timeout = 3
-        runner = EchoClientRunner(self.test_name, 1, self.logger,
-                                  None, None, 100, 1,
-                                  timeout=client_runner_timeout,
-                                  port_override=self.tcp_listener_port)
-        result = None
+        # now verify that the interior router sees the service address
+        # and two proxy links are created
+        self.i_router.wait_address(self.test_name, subscribers=1)
+        while True:
+            links = self._query_links_by_addr(self.i_router.management,
+                                              self.test_name)
+            if links:
+                # expect a single consumer link that represents
+                # the connector
+                self.assertEqual(1, len(links))
+                self.assertEqual("out", links[0][1])
+                break
+            time.sleep(0.25)
 
-        # Give some time for the client runner to finish up.
-        time.sleep(client_runner_timeout + 1)
-
-        # Make sure servers are still up
-        if self.echo_server.error:
-            self.logger.log(
-                "TCP_TEST %s Server %s stopped with error: %s" %
-                (self.test_name, self.echo_server.prefix,
-                 self.echo_server.error))
-            result = self.echo_server.error
-
-        if self.echo_server.exit_status:
-            self.logger.log(
-                "TCP_TEST %s Server %s stopped with status: %s" %
-                (self.test_name, self.echo_server.prefix, self.echo_server.exit_status))
-            result = self.echo_server.exit_status
-
-        self.assertIsNone(result)
-
-        error = runner.client_error()
-        if error is not None:
-            self.logger.log("TCP_TEST %s Client %s stopped with error: %s" %
-                            (self.test_name, runner.name, error))
-
-        self.assertIsNone(error)
-        status = runner.client_exit_status()
-        if status is not None:
-            self.logger.log("TCP_TEST %s Client %s stopped with status: %s" %
-                            (self.test_name, runner.name, status))
-        self.assertIsNone(status)
-        self.assertFalse(runner.client_running())
-
-        # Delete the connector and make sure the echo client fails.
+        # Delete the connector and listener
         out = mgmt.delete(type=CONNECTOR_TYPE, name=connector_name)
         self.assertIsNone(out)
-
-        # Give some time for the connector to be deleted by the router.
-        # Deleting a connector also involves deleting existing connections
-        # that were made using the details from the connector.
-        # In this case, the router would have to drop the connection it
-        # already made to the echo server, so let's give it some time to
-        # do that.
-        time.sleep(2)
-
-        client_runner_timeout = 2
-        # Start the echo client runner
-        runner = EchoClientRunner(self.test_name, 1, self.logger,
-                                  None, None, 100, 1,
-                                  # Try for 2 seconds before timing out
-                                  timeout=client_runner_timeout,
-                                  port_override=self.tcp_listener_port)
-        time.sleep(client_runner_timeout + 1)
-        exit_status = runner.client_exit_status()
-
-        if exit_status is not None:
-            # The test is a success, the echo client sender timed out
-            # because it did not receive anything back from the
-            # echo server because the connector to the echo server
-            # got deleted
-            self.logger.log("TCP_TEST %s Client %s timedout with error: %s" %
-                            (self.test_name, runner.name, exit_status))
-        else:
-            self.logger.log("ERROR: Connector not deleted")
-        self.assertIsNotNone(exit_status)
-
-        # Now delete the tcpListener
+        self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
         out = mgmt.delete(type=LISTENER_TYPE, name=listener_name)
         self.assertIsNone(out)
+        self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
 
-        runner = EchoClientRunner(self.test_name, 1, self.logger,
-                                  None, None, 100, 1,
-                                  # Try for 2 seconds before timing out
-                                  timeout=client_runner_timeout,
-                                  port_override=self.tcp_listener_port)
-        time.sleep(client_runner_timeout + 1)
-        error = runner.client_error()
-        self.assertIn("ConnectionRefusedError", error)
+        # verify the service address and proxy links are no longer active on
+        # the interior router
+        self.i_router.wait_address_unsubscribed(self.test_name)
+        while True:
+            links = self._query_links_by_addr(self.i_router.management,
+                                              self.test_name)
+            if len(links) == 0:
+                break
+            time.sleep(0.25)
+
+        # verify that clients can no longer connect to the listener
+        client_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        client_conn.setblocking(True)
+        client_conn.settimeout(5)
+        with self.assertRaises(ConnectionRefusedError):
+            client_conn.connect(('127.0.0.1', self.tcp_listener_port))
+        client_conn.close()
 
 
 if __name__ == '__main__':

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/03: DISPATCH-2260: HTTP/1.x: fix deletion of httpConnector and httpListener

Posted by kg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch 1.17.x
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 1dfc140ff6d05027da4a0fb69138f1e280cb9b41
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Mon Oct 25 15:32:43 2021 -0400

    DISPATCH-2260: HTTP/1.x: fix deletion of httpConnector and httpListener
    
    This closes #1394
---
 include/qpid/dispatch/protocol_adaptor.h |  12 ++
 src/adaptors/http1/http1_adaptor.c       |  19 +--
 src/adaptors/http1/http1_client.c        |  26 +++-
 src/adaptors/http1/http1_private.h       |   6 +-
 src/adaptors/http1/http1_server.c        |  73 +++++++-----
 src/router_core/connections.c            |   6 +-
 src/router_core/router_core_private.h    |  15 +--
 tests/system_tests_http1_adaptor.py      | 197 ++++++++++++++++++++++---------
 8 files changed, 237 insertions(+), 117 deletions(-)

diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index 654fe74..00551c2 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -323,6 +323,18 @@ void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t *adaptor
  */
 
 typedef enum {
+    QD_CONN_OPER_UP,
+    QD_CONN_OPER_DOWN,
+} qd_conn_oper_status_t;
+
+
+typedef enum {
+    QD_CONN_ADMIN_ENABLED,
+    QD_CONN_ADMIN_DELETED
+} qd_conn_admin_status_t;
+
+
+typedef enum {
     QD_LINK_ENDPOINT,      ///< A link to a connected endpoint
     QD_LINK_CONTROL,       ///< A link to a peer router for control messages
     QD_LINK_ROUTER,        ///< A link to a peer router for routed messages
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 64b2a17..3828e89 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -405,7 +405,8 @@ void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
 //
 
 
-// Invoked by the core thread to wake an I/O thread for the connection
+// Invoked by the core/mgmt thread to wake an I/O thread for the connection.
+// Must be thread safe.
 //
 static void _core_connection_activate_CT(void *context, qdr_connection_t *conn)
 {
@@ -670,21 +671,23 @@ static void qd_http1_adaptor_final(void *adaptor_context)
     qdr_http1_adaptor_t *adaptor = (qdr_http1_adaptor_t*) adaptor_context;
     qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
 
+    qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections);
+    while (hconn) {
+        qdr_http1_connection_free(hconn);
+        hconn = DEQ_HEAD(adaptor->connections);
+    }
     qd_http_listener_t *li = DEQ_HEAD(adaptor->listeners);
     while (li) {
-        qd_http1_delete_listener(0, li);
+        DEQ_REMOVE_HEAD(qdr_http1_adaptor->listeners);
+        qd_http_listener_decref(li);
         li = DEQ_HEAD(adaptor->listeners);
     }
     qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors);
     while (ct) {
-        qd_http1_delete_connector(0, ct);
+        DEQ_REMOVE_HEAD(qdr_http1_adaptor->connectors);
+        qd_http_connector_decref(ct);
         ct = DEQ_HEAD(adaptor->connectors);
     }
-    qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections);
-    while (hconn) {
-        qdr_http1_connection_free(hconn);
-        hconn = DEQ_HEAD(adaptor->connections);
-    }
 
     sys_mutex_free(adaptor->lock);
     qdr_http1_adaptor =  NULL;
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 9a6b845..fa73560 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -132,6 +132,8 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li)
 
     ZERO(hconn);
     hconn->type = HTTP1_CONN_CLIENT;
+    hconn->admin_status = QD_CONN_ADMIN_ENABLED;
+    hconn->oper_status = QD_CONN_OPER_DOWN;
     hconn->qd_server = li->server;
     hconn->adaptor = qdr_http1_adaptor;
     hconn->handler_context.handler = &_handle_connection_events;
@@ -219,8 +221,14 @@ static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void
             } else {
                 qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
             }
+
+            sys_mutex_lock(qdr_http1_adaptor->lock);
             pn_listener_set_context(li->pn_listener, 0);
             li->pn_listener = 0;
+            DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
+            sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+            qd_http_listener_decref(li);
         }
         break;
     }
@@ -233,6 +241,9 @@ static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void
 
 // Management Agent API - Create
 //
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
+//
 qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
 {
     qd_http_listener_t *li = qd_http_listener(qd->server, &_handle_listener_events);
@@ -256,19 +267,20 @@ qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http
 
 // Management Agent API - Delete
 //
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
+//
 void qd_http1_delete_listener(qd_dispatch_t *ignore, qd_http_listener_t *li)
 {
     if (li) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleting HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
+        sys_mutex_lock(qdr_http1_adaptor->lock);
         if (li->pn_listener) {
+            // note that the proactor may immediately schedule the
+            // PN_LISTENER_CLOSED event on another thread...
             pn_listener_close(li->pn_listener);
-            li->pn_listener = 0;
         }
-        sys_mutex_lock(qdr_http1_adaptor->lock);
-        DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
         sys_mutex_unlock(qdr_http1_adaptor->lock);
-
-        qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
-        qd_http_listener_decref(li);
     }
 }
 
@@ -317,6 +329,7 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn)
                                             0,      // bind context
                                             0);     // bind token
     qdr_connection_set_context(hconn->qdr_conn, hconn);
+    hconn->oper_status = QD_CONN_OPER_UP;
 
     qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to client created", hconn->conn_id);
 
@@ -460,6 +473,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         sys_mutex_unlock(qdr_http1_adaptor->lock);
         // at this point the core can no longer activate this connection
 
+        hconn->oper_status = QD_CONN_OPER_DOWN;
         if (hconn->out_link) {
             qdr_link_set_context(hconn->out_link, 0);
             qdr_link_detach(hconn->out_link, QD_LOST, 0);
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index 8296633..1d4a987 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -105,6 +105,7 @@ struct qdr_http1_request_base_t {
 };
 DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t);
 
+
 // A single HTTP adaptor connection.
 //
 struct qdr_http1_connection_t {
@@ -118,6 +119,8 @@ struct qdr_http1_connection_t {
     uint64_t               conn_id;
     qd_handler_context_t   handler_context;
     h1_codec_connection_type_t     type;
+    qd_conn_admin_status_t         admin_status;
+    qd_conn_oper_status_t          oper_status;
 
     struct {
         char *host;
@@ -194,9 +197,6 @@ ALLOC_DECLARE(qdr_http1_connection_t);
 
 // http1_adaptor.c
 //
-//int qdr_http1_write_out_data(qdr_http1_connection_t *hconn);
-//void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t *blist);
-
 void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
 void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, qd_buffer_list_t *blist, uintmax_t octets);
 void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, qd_message_stream_data_t *stream_data);
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index c606a46..a2e93bd 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -152,6 +152,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct
 
     ZERO(hconn);
     hconn->type = HTTP1_CONN_SERVER;
+    hconn->admin_status = QD_CONN_ADMIN_ENABLED;
+    hconn->oper_status = QD_CONN_OPER_UP;
     hconn->qd_server = qd->server;
     hconn->adaptor = qdr_http1_adaptor;
     hconn->handler_context.handler = &_handle_connection_events;
@@ -216,6 +218,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct
 
 // Management Agent API - Create
 //
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
 qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
 {
     qd_http_connector_t *c = qd_http_connector(qd->server);
@@ -257,6 +261,8 @@ qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_ht
 
 // Management Agent API - Delete
 //
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
 void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
 {
     if (ct) {
@@ -265,15 +271,17 @@ void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
         sys_mutex_lock(qdr_http1_adaptor->lock);
         DEQ_REMOVE(qdr_http1_adaptor->connectors, ct);
         qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) ct->ctx;
+        qdr_connection_t *qdr_conn = 0;
         if (hconn) {
+            hconn->admin_status = QD_CONN_ADMIN_DELETED;
             hconn->server.connector = 0;
             ct->ctx = 0;
-            if (hconn->qdr_conn)
-                // have the core close this connection
-                qdr_core_close_connection(hconn->qdr_conn);
+            qdr_conn = hconn->qdr_conn;
         }
         sys_mutex_unlock(qdr_http1_adaptor->lock);
 
+        if (qdr_conn)
+            qdr_core_close_connection(qdr_conn);
         qd_http_connector_decref(ct);
     }
 }
@@ -435,25 +443,28 @@ static void _do_reconnect(void *context)
 
     _process_request((_server_request_t*) DEQ_HEAD(hconn->requests));
 
-    // Do not attempt to re-connect if the current request is still in
-    // progress. This happens when the server has closed the connection before
-    // the request message has fully arrived (!rx_complete).
-    // qdr_connection_process() will continue to invoke the
-    // qdr_http1_server_core_link_deliver callback until the request message is
-    // complete.
-
-    // false positive: head request is removed before it is freed, null is passed
-    /* coverity[pass_freed_arg] */
-    if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) {
-        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
-               "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
-        sys_mutex_lock(qdr_http1_adaptor->lock);
-        hconn->raw_conn = pn_raw_connection();
-        pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
-        // this next call may immediately reschedule the connection on another I/O
-        // thread. After this call hconn may no longer be valid!
-        pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
-        sys_mutex_unlock(qdr_http1_adaptor->lock);
+    if (hconn->admin_status == QD_CONN_ADMIN_ENABLED) {
+
+        // Do not attempt to re-connect if the current request is still in
+        // progress. This happens when the server has closed the connection before
+        // the request message has fully arrived (!rx_complete).
+        // qdr_connection_process() will continue to invoke the
+        // qdr_http1_server_core_link_deliver callback until the request message is
+        // complete.
+
+        // false positive: head request is removed before it is freed, null is passed
+        /* coverity[pass_freed_arg] */
+        if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) {
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
+            sys_mutex_lock(qdr_http1_adaptor->lock);
+            hconn->raw_conn = pn_raw_connection();
+            pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+            // this next call may immediately reschedule the connection on another I/O
+            // thread. After this call hconn may no longer be valid!
+            pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
+            sys_mutex_unlock(qdr_http1_adaptor->lock);
+        }
     }
 }
 
@@ -584,7 +595,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         //
 
         bool reconnect = false;
-        if (hconn->qdr_conn) {
+        if (hconn->admin_status == QD_CONN_ADMIN_ENABLED && hconn->qdr_conn) {
             if (hconn->server.link_timeout == 0) {
                 hconn->server.link_timeout = qd_timer_now() + LINK_TIMEOUT_MSEC;
                 hconn->server.reconnect_pause = 0;
@@ -600,13 +611,15 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         // prevent core activation
         sys_mutex_lock(qdr_http1_adaptor->lock);
         hconn->raw_conn = 0;
-        if (reconnect && hconn->server.reconnect_timer)
+        if (reconnect && hconn->server.reconnect_timer) {
             qd_timer_schedule(hconn->server.reconnect_timer, hconn->server.reconnect_pause);
+            sys_mutex_unlock(qdr_http1_adaptor->lock);
+            // do not manipulate hconn further as it may now be processed by the
+            // timer thread
+            return;
+        }
         sys_mutex_unlock(qdr_http1_adaptor->lock);
-
-        // do not manipulate hconn further as it may now be processed by the
-        // timer thread
-        return;
+        break;
     }
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
         _send_request_message((_server_request_t*) DEQ_HEAD(hconn->requests));
@@ -1722,8 +1735,10 @@ void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
     sys_mutex_unlock(qdr_http1_adaptor->lock);
     // the core thread can no longer activate this connection
 
+    hconn->oper_status = QD_CONN_OPER_DOWN;
+    _teardown_server_links(hconn);
     qdr_connection_closed(qdr_conn);
-    qdr_http1_close_connection(hconn, "Connection closed by management");
+    qdr_http1_close_connection(hconn, error);
 
     // it is expected that this callback is the final callback before returning
     // from qdr_connection_process(). Free hconn when qdr_connection_process returns.
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4f28d48..a5cb937 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -102,8 +102,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t                   *core,
     conn->policy_spec           = policy_spec;
     conn->link_capacity         = link_capacity;
     conn->mask_bit              = -1;
-    conn->admin_status          = QDR_CONN_ADMIN_ENABLED;
-    conn->oper_status           = QDR_CONN_OPER_UP;
+    conn->admin_status          = QD_CONN_ADMIN_ENABLED;
+    conn->oper_status           = QD_CONN_OPER_UP;
     DEQ_INIT(conn->links);
     DEQ_INIT(conn->work_list);
     DEQ_INIT(conn->streaming_link_pool);
@@ -279,7 +279,7 @@ void qdr_close_connection_CT(qdr_core_t *core, qdr_connection_t  *conn)
 {
     conn->closed = true;
     conn->error  = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Connection forced-closed by management request");
-    conn->admin_status = QDR_CONN_ADMIN_DELETED;
+    conn->admin_status = QD_CONN_ADMIN_DELETED;
 
     //Activate the connection, so the I/O threads can finish the job.
     qdr_connection_activate_CT(core, conn);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c08753a..e4211ba 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -654,17 +654,6 @@ ALLOC_DECLARE(qdr_connection_info_t);
 DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t);
 
 
-typedef enum {
-    QDR_CONN_OPER_UP,
-} qdr_conn_oper_status_t;
-
-
-typedef enum {
-    QDR_CONN_ADMIN_ENABLED,
-    QDR_CONN_ADMIN_DELETED
-} qdr_conn_admin_status_t;
-
-
 struct qdr_connection_t {
     DEQ_LINKS(qdr_connection_t);
     DEQ_LINKS_N(ACTIVATE, qdr_connection_t);
@@ -690,8 +679,8 @@ struct qdr_connection_t {
     qdr_connection_info_t      *connection_info;
     void                       *user_context; /* Updated from IO thread, use work_lock */
     qdr_link_route_list_t       conn_link_routes;  // connection scoped link routes
-    qdr_conn_oper_status_t      oper_status;
-    qdr_conn_admin_status_t     admin_status;
+    qd_conn_oper_status_t       oper_status;
+    qd_conn_admin_status_t      admin_status;
     qdr_error_t                *error;
     bool                        closed; // This bit is used in the case where a client is trying to force close this connection.
     uint32_t                    conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index 2f2117b..986a265 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -43,85 +43,106 @@ from http1_tests import Http1CurlTestsMixIn
 
 class Http1AdaptorManagementTest(TestCase):
     """
-    Test Creation and deletion of HTTP1 management entities
+    Test Creation and deletion of HTTP1 management entities.
     """
     @classmethod
     def setUpClass(cls):
         super(Http1AdaptorManagementTest, cls).setUpClass()
 
+        cls.LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener'
+        cls.CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector'
+        cls.CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection'
+
+        cls.interior_edge_port = cls.tester.get_port()
+        cls.interior_mgmt_port = cls.tester.get_port()
+        cls.edge_mgmt_port = cls.tester.get_port()
+
         cls.http_server_port = cls.tester.get_port()
         cls.http_listener_port = cls.tester.get_port()
 
-        config = [
-            ('router', {'mode': 'standalone',
-                        'id': 'HTTP1MgmtTest',
-                        'allowUnsettledMulticast': 'yes'}),
+        i_config = [
+            ('router', {'mode': 'interior',
+                        'id': 'HTTP1MgmtTestInterior'}),
             ('listener', {'role': 'normal',
-                          'port': cls.tester.get_port()}),
+                          'port': cls.interior_mgmt_port}),
+            ('listener', {'role': 'edge', 'port': cls.interior_edge_port}),
             ('address', {'prefix': 'closest',   'distribution': 'closest'}),
             ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
         ]
+        config = Qdrouterd.Config(i_config)
+        cls.i_router = cls.tester.qdrouterd('HTTP1MgmtTestInterior', config, wait=False)
 
-        config = Qdrouterd.Config(config)
-        cls.router = cls.tester.qdrouterd('HTTP1MgmtTest', config, wait=True)
-
-    def test_01_mgmt(self):
-        """
-        Create and delete HTTP1 connectors and listeners
-        """
-        LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener'
-        CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector'
-        CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection'
-
-        mgmt = self.router.management
-        self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
-        self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
-
-        mgmt.create(type=CONNECTOR_TYPE,
-                    name="ServerConnector",
-                    attributes={'address': 'http1',
-                                'port': self.http_server_port,
-                                'protocolVersion': 'HTTP1'})
-
-        mgmt.create(type=LISTENER_TYPE,
-                    name="ClientListener",
-                    attributes={'address': 'http1',
-                                'port': self.http_listener_port,
-                                'protocolVersion': 'HTTP1'})
+        e_config = [
+            ('router', {'mode': 'edge',
+                        'id': 'HTTP1MgmtTestEdge'}),
+            ('listener', {'role': 'normal',
+                          'port': cls.edge_mgmt_port}),
+            ('connector', {'name': 'edge', 'role': 'edge',
+                           'port': cls.interior_edge_port}),
+            ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+        ]
+        config = Qdrouterd.Config(e_config)
+        cls.e_router = cls.tester.qdrouterd('HTTP1MgmtTestEdge', config,
+                                            wait=False)
+
+        cls.i_router.wait_ready()
+        cls.e_router.wait_ready()
+
+    def test_01_create_delete(self):
+        """ Create and delete HTTP1 connectors and listeners.  The
+        connectors/listeners are created on the edge router.  Verify that the
+        adaptor properly notifies the interior of the subscribers/producers.
+        """
+        e_mgmt = self.e_router.management
+        self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+        e_mgmt.create(type=self.CONNECTOR_TYPE,
+                      name="ServerConnector",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_server_port,
+                                  'protocolVersion': 'HTTP1'})
+
+        e_mgmt.create(type=self.LISTENER_TYPE,
+                      name="ClientListener",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_listener_port,
+                                  'protocolVersion': 'HTTP1'})
 
         # verify the entities have been created and http traffic works
 
-        self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results))
-        self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
+        self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+        self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
 
         count, error = http1_ping(sport=self.http_server_port,
                                   cport=self.http_listener_port)
         self.assertIsNone(error)
         self.assertEqual(1, count)
 
+        # now check the interior router for the closest/http1Service address
+        self.i_router.wait_address("closest/http1Service", subscribers=1)
+
         #
-        # delete the connector and wait for the associated connection to be
-        # removed
+        # delete the connector and listener; wait for the associated connection
+        # to be removed
         #
+        e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+        e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener")
+        self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
 
-        mgmt.delete(type=CONNECTOR_TYPE, name="ServerConnector")
-        self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
-
-        retry = 20  # 20 * 0.25 = 5 sec
-        hconns = 0
-        while retry:
-            obj = mgmt.query(type=CONNECTION_TYPE,
-                             attribute_names=["protocol"])
+        # will hit test timeout on failure:
+        while True:
+            hconns = 0
+            obj = e_mgmt.query(type=self.CONNECTION_TYPE,
+                               attribute_names=["protocol"])
             for item in obj.get_dicts():
                 if "http/1.x" in item["protocol"]:
                     hconns += 1
             if hconns == 0:
                 break
             sleep(0.25)
-            retry -= 1
-            hconns = 0
-
-        self.assertEqual(0, hconns, msg="HTTP connection not deleted")
 
         # When a connector is configured the router will periodically attempt
         # to connect to the server address. To prove that the connector has
@@ -137,22 +158,88 @@ class Http1AdaptorManagementTest(TestCase):
             conn, addr = s.accept()
         s.close()
 
+        # Verify that the address is no longer bound on the interior
+        self.i_router.wait_address_unsubscribed("closest/http1Service")
+
         #
-        # re-create the connector and verify it works
+        # re-create the connector and listener; verify it works
         #
-        mgmt.create(type=CONNECTOR_TYPE,
-                    name="ServerConnector",
-                    attributes={'address': 'http1',
-                                'port': self.http_server_port,
-                                'protocolVersion': 'HTTP1'})
+        e_mgmt.create(type=self.CONNECTOR_TYPE,
+                      name="ServerConnector",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_server_port,
+                                  'protocolVersion': 'HTTP1'})
+
+        e_mgmt.create(type=self.LISTENER_TYPE,
+                      name="ClientListener",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_listener_port,
+                                  'protocolVersion': 'HTTP1'})
 
-        self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
+        self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+        self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
 
         count, error = http1_ping(sport=self.http_server_port,
                                   cport=self.http_listener_port)
         self.assertIsNone(error)
         self.assertEqual(1, count)
 
+        self.i_router.wait_address("closest/http1Service", subscribers=1)
+
+        e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+        e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener")
+        self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+
+    def test_01_delete_active_connector(self):
+        """Delete an HTTP1 connector that is currently connected to a server.
+        Verify the connection is dropped.
+        """
+        e_mgmt = self.e_router.management
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+        e_mgmt.create(type=self.CONNECTOR_TYPE,
+                      name="ServerConnector",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_server_port,
+                                  'protocolVersion': 'HTTP1'})
+
+        # verify the connector has been created and attach a dummy server
+        self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        server.bind(("", self.http_server_port))
+        server.setblocking(True)
+        server.settimeout(5)
+        server.listen(1)
+        conn, _ = server.accept()
+        server.close()
+
+        # now check the interior router for the closest/http1Service address
+        self.i_router.wait_address("closest/http1Service", subscribers=1)
+
+        # delete the connector
+        e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+        # expect socket to close
+        while True:
+            try:
+                rd, _, _ = select.select([conn], [], [])
+            except select.error as serror:
+                if serror[0] == errno.EINTR:
+                    print("ignoring interrupt from select(): %s" % str(serror))
+                    continue
+                raise  # assuming fatal...
+            if len(conn.recv(10)) == 0:
+                break
+
+        conn.close()
+
+        # Verify that the address is no longer bound on the interior
+        self.i_router.wait_address_unsubscribed("closest/http1Service")
+
 
 class Http1AdaptorOneRouterTest(Http1OneRouterTestBase,
                                 CommonHttp1OneRouterTest):

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org