You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/10/28 18:09:52 UTC

[qpid-dispatch] 01/03: DISPATCH-2260: HTTP/1.x: fix deletion of httpConnector and httpListener

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch 1.17.x
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 1dfc140ff6d05027da4a0fb69138f1e280cb9b41
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Mon Oct 25 15:32:43 2021 -0400

    DISPATCH-2260: HTTP/1.x: fix deletion of httpConnector and httpListener
    
    This closes #1394
---
 include/qpid/dispatch/protocol_adaptor.h |  12 ++
 src/adaptors/http1/http1_adaptor.c       |  19 +--
 src/adaptors/http1/http1_client.c        |  26 +++-
 src/adaptors/http1/http1_private.h       |   6 +-
 src/adaptors/http1/http1_server.c        |  73 +++++++-----
 src/router_core/connections.c            |   6 +-
 src/router_core/router_core_private.h    |  15 +--
 tests/system_tests_http1_adaptor.py      | 197 ++++++++++++++++++++++---------
 8 files changed, 237 insertions(+), 117 deletions(-)

diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index 654fe74..00551c2 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -323,6 +323,18 @@ void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t *adaptor
  */
 
 typedef enum {
+    QD_CONN_OPER_UP,
+    QD_CONN_OPER_DOWN,
+} qd_conn_oper_status_t;
+
+
+typedef enum {
+    QD_CONN_ADMIN_ENABLED,
+    QD_CONN_ADMIN_DELETED
+} qd_conn_admin_status_t;
+
+
+typedef enum {
     QD_LINK_ENDPOINT,      ///< A link to a connected endpoint
     QD_LINK_CONTROL,       ///< A link to a peer router for control messages
     QD_LINK_ROUTER,        ///< A link to a peer router for routed messages
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 64b2a17..3828e89 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -405,7 +405,8 @@ void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
 //
 
 
-// Invoked by the core thread to wake an I/O thread for the connection
+// Invoked by the core/mgmt thread to wake an I/O thread for the connection.
+// Must be thread safe.
 //
 static void _core_connection_activate_CT(void *context, qdr_connection_t *conn)
 {
@@ -670,21 +671,23 @@ static void qd_http1_adaptor_final(void *adaptor_context)
     qdr_http1_adaptor_t *adaptor = (qdr_http1_adaptor_t*) adaptor_context;
     qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
 
+    qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections);
+    while (hconn) {
+        qdr_http1_connection_free(hconn);
+        hconn = DEQ_HEAD(adaptor->connections);
+    }
     qd_http_listener_t *li = DEQ_HEAD(adaptor->listeners);
     while (li) {
-        qd_http1_delete_listener(0, li);
+        DEQ_REMOVE_HEAD(qdr_http1_adaptor->listeners);
+        qd_http_listener_decref(li);
         li = DEQ_HEAD(adaptor->listeners);
     }
     qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors);
     while (ct) {
-        qd_http1_delete_connector(0, ct);
+        DEQ_REMOVE_HEAD(qdr_http1_adaptor->connectors);
+        qd_http_connector_decref(ct);
         ct = DEQ_HEAD(adaptor->connectors);
     }
-    qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections);
-    while (hconn) {
-        qdr_http1_connection_free(hconn);
-        hconn = DEQ_HEAD(adaptor->connections);
-    }
 
     sys_mutex_free(adaptor->lock);
     qdr_http1_adaptor =  NULL;
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 9a6b845..fa73560 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -132,6 +132,8 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li)
 
     ZERO(hconn);
     hconn->type = HTTP1_CONN_CLIENT;
+    hconn->admin_status = QD_CONN_ADMIN_ENABLED;
+    hconn->oper_status = QD_CONN_OPER_DOWN;
     hconn->qd_server = li->server;
     hconn->adaptor = qdr_http1_adaptor;
     hconn->handler_context.handler = &_handle_connection_events;
@@ -219,8 +221,14 @@ static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void
             } else {
                 qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
             }
+
+            sys_mutex_lock(qdr_http1_adaptor->lock);
             pn_listener_set_context(li->pn_listener, 0);
             li->pn_listener = 0;
+            DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
+            sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+            qd_http_listener_decref(li);
         }
         break;
     }
@@ -233,6 +241,9 @@ static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void
 
 // Management Agent API - Create
 //
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
+//
 qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
 {
     qd_http_listener_t *li = qd_http_listener(qd->server, &_handle_listener_events);
@@ -256,19 +267,20 @@ qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http
 
 // Management Agent API - Delete
 //
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
+//
 void qd_http1_delete_listener(qd_dispatch_t *ignore, qd_http_listener_t *li)
 {
     if (li) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleting HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
+        sys_mutex_lock(qdr_http1_adaptor->lock);
         if (li->pn_listener) {
+            // note that the proactor may immediately schedule the
+            // PN_LISTENER_CLOSED event on another thread...
             pn_listener_close(li->pn_listener);
-            li->pn_listener = 0;
         }
-        sys_mutex_lock(qdr_http1_adaptor->lock);
-        DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
         sys_mutex_unlock(qdr_http1_adaptor->lock);
-
-        qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
-        qd_http_listener_decref(li);
     }
 }
 
@@ -317,6 +329,7 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn)
                                             0,      // bind context
                                             0);     // bind token
     qdr_connection_set_context(hconn->qdr_conn, hconn);
+    hconn->oper_status = QD_CONN_OPER_UP;
 
     qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to client created", hconn->conn_id);
 
@@ -460,6 +473,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         sys_mutex_unlock(qdr_http1_adaptor->lock);
         // at this point the core can no longer activate this connection
 
+        hconn->oper_status = QD_CONN_OPER_DOWN;
         if (hconn->out_link) {
             qdr_link_set_context(hconn->out_link, 0);
             qdr_link_detach(hconn->out_link, QD_LOST, 0);
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index 8296633..1d4a987 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -105,6 +105,7 @@ struct qdr_http1_request_base_t {
 };
 DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t);
 
+
 // A single HTTP adaptor connection.
 //
 struct qdr_http1_connection_t {
@@ -118,6 +119,8 @@ struct qdr_http1_connection_t {
     uint64_t               conn_id;
     qd_handler_context_t   handler_context;
     h1_codec_connection_type_t     type;
+    qd_conn_admin_status_t         admin_status;
+    qd_conn_oper_status_t          oper_status;
 
     struct {
         char *host;
@@ -194,9 +197,6 @@ ALLOC_DECLARE(qdr_http1_connection_t);
 
 // http1_adaptor.c
 //
-//int qdr_http1_write_out_data(qdr_http1_connection_t *hconn);
-//void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t *blist);
-
 void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
 void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, qd_buffer_list_t *blist, uintmax_t octets);
 void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, qd_message_stream_data_t *stream_data);
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index c606a46..a2e93bd 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -152,6 +152,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct
 
     ZERO(hconn);
     hconn->type = HTTP1_CONN_SERVER;
+    hconn->admin_status = QD_CONN_ADMIN_ENABLED;
+    hconn->oper_status = QD_CONN_OPER_UP;
     hconn->qd_server = qd->server;
     hconn->adaptor = qdr_http1_adaptor;
     hconn->handler_context.handler = &_handle_connection_events;
@@ -216,6 +218,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct
 
 // Management Agent API - Create
 //
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
 qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
 {
     qd_http_connector_t *c = qd_http_connector(qd->server);
@@ -257,6 +261,8 @@ qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_ht
 
 // Management Agent API - Delete
 //
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
 void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
 {
     if (ct) {
@@ -265,15 +271,17 @@ void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
         sys_mutex_lock(qdr_http1_adaptor->lock);
         DEQ_REMOVE(qdr_http1_adaptor->connectors, ct);
         qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) ct->ctx;
+        qdr_connection_t *qdr_conn = 0;
         if (hconn) {
+            hconn->admin_status = QD_CONN_ADMIN_DELETED;
             hconn->server.connector = 0;
             ct->ctx = 0;
-            if (hconn->qdr_conn)
-                // have the core close this connection
-                qdr_core_close_connection(hconn->qdr_conn);
+            qdr_conn = hconn->qdr_conn;
         }
         sys_mutex_unlock(qdr_http1_adaptor->lock);
 
+        if (qdr_conn)
+            qdr_core_close_connection(qdr_conn);
         qd_http_connector_decref(ct);
     }
 }
@@ -435,25 +443,28 @@ static void _do_reconnect(void *context)
 
     _process_request((_server_request_t*) DEQ_HEAD(hconn->requests));
 
-    // Do not attempt to re-connect if the current request is still in
-    // progress. This happens when the server has closed the connection before
-    // the request message has fully arrived (!rx_complete).
-    // qdr_connection_process() will continue to invoke the
-    // qdr_http1_server_core_link_deliver callback until the request message is
-    // complete.
-
-    // false positive: head request is removed before it is freed, null is passed
-    /* coverity[pass_freed_arg] */
-    if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) {
-        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
-               "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
-        sys_mutex_lock(qdr_http1_adaptor->lock);
-        hconn->raw_conn = pn_raw_connection();
-        pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
-        // this next call may immediately reschedule the connection on another I/O
-        // thread. After this call hconn may no longer be valid!
-        pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
-        sys_mutex_unlock(qdr_http1_adaptor->lock);
+    if (hconn->admin_status == QD_CONN_ADMIN_ENABLED) {
+
+        // Do not attempt to re-connect if the current request is still in
+        // progress. This happens when the server has closed the connection before
+        // the request message has fully arrived (!rx_complete).
+        // qdr_connection_process() will continue to invoke the
+        // qdr_http1_server_core_link_deliver callback until the request message is
+        // complete.
+
+        // false positive: head request is removed before it is freed, null is passed
+        /* coverity[pass_freed_arg] */
+        if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) {
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
+            sys_mutex_lock(qdr_http1_adaptor->lock);
+            hconn->raw_conn = pn_raw_connection();
+            pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+            // this next call may immediately reschedule the connection on another I/O
+            // thread. After this call hconn may no longer be valid!
+            pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
+            sys_mutex_unlock(qdr_http1_adaptor->lock);
+        }
     }
 }
 
@@ -584,7 +595,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         //
 
         bool reconnect = false;
-        if (hconn->qdr_conn) {
+        if (hconn->admin_status == QD_CONN_ADMIN_ENABLED && hconn->qdr_conn) {
             if (hconn->server.link_timeout == 0) {
                 hconn->server.link_timeout = qd_timer_now() + LINK_TIMEOUT_MSEC;
                 hconn->server.reconnect_pause = 0;
@@ -600,13 +611,15 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         // prevent core activation
         sys_mutex_lock(qdr_http1_adaptor->lock);
         hconn->raw_conn = 0;
-        if (reconnect && hconn->server.reconnect_timer)
+        if (reconnect && hconn->server.reconnect_timer) {
             qd_timer_schedule(hconn->server.reconnect_timer, hconn->server.reconnect_pause);
+            sys_mutex_unlock(qdr_http1_adaptor->lock);
+            // do not manipulate hconn further as it may now be processed by the
+            // timer thread
+            return;
+        }
         sys_mutex_unlock(qdr_http1_adaptor->lock);
-
-        // do not manipulate hconn further as it may now be processed by the
-        // timer thread
-        return;
+        break;
     }
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
         _send_request_message((_server_request_t*) DEQ_HEAD(hconn->requests));
@@ -1722,8 +1735,10 @@ void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
     sys_mutex_unlock(qdr_http1_adaptor->lock);
     // the core thread can no longer activate this connection
 
+    hconn->oper_status = QD_CONN_OPER_DOWN;
+    _teardown_server_links(hconn);
     qdr_connection_closed(qdr_conn);
-    qdr_http1_close_connection(hconn, "Connection closed by management");
+    qdr_http1_close_connection(hconn, error);
 
     // it is expected that this callback is the final callback before returning
     // from qdr_connection_process(). Free hconn when qdr_connection_process returns.
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4f28d48..a5cb937 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -102,8 +102,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t                   *core,
     conn->policy_spec           = policy_spec;
     conn->link_capacity         = link_capacity;
     conn->mask_bit              = -1;
-    conn->admin_status          = QDR_CONN_ADMIN_ENABLED;
-    conn->oper_status           = QDR_CONN_OPER_UP;
+    conn->admin_status          = QD_CONN_ADMIN_ENABLED;
+    conn->oper_status           = QD_CONN_OPER_UP;
     DEQ_INIT(conn->links);
     DEQ_INIT(conn->work_list);
     DEQ_INIT(conn->streaming_link_pool);
@@ -279,7 +279,7 @@ void qdr_close_connection_CT(qdr_core_t *core, qdr_connection_t  *conn)
 {
     conn->closed = true;
     conn->error  = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Connection forced-closed by management request");
-    conn->admin_status = QDR_CONN_ADMIN_DELETED;
+    conn->admin_status = QD_CONN_ADMIN_DELETED;
 
     //Activate the connection, so the I/O threads can finish the job.
     qdr_connection_activate_CT(core, conn);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c08753a..e4211ba 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -654,17 +654,6 @@ ALLOC_DECLARE(qdr_connection_info_t);
 DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t);
 
 
-typedef enum {
-    QDR_CONN_OPER_UP,
-} qdr_conn_oper_status_t;
-
-
-typedef enum {
-    QDR_CONN_ADMIN_ENABLED,
-    QDR_CONN_ADMIN_DELETED
-} qdr_conn_admin_status_t;
-
-
 struct qdr_connection_t {
     DEQ_LINKS(qdr_connection_t);
     DEQ_LINKS_N(ACTIVATE, qdr_connection_t);
@@ -690,8 +679,8 @@ struct qdr_connection_t {
     qdr_connection_info_t      *connection_info;
     void                       *user_context; /* Updated from IO thread, use work_lock */
     qdr_link_route_list_t       conn_link_routes;  // connection scoped link routes
-    qdr_conn_oper_status_t      oper_status;
-    qdr_conn_admin_status_t     admin_status;
+    qd_conn_oper_status_t       oper_status;
+    qd_conn_admin_status_t      admin_status;
     qdr_error_t                *error;
     bool                        closed; // This bit is used in the case where a client is trying to force close this connection.
     uint32_t                    conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index 2f2117b..986a265 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -43,85 +43,106 @@ from http1_tests import Http1CurlTestsMixIn
 
 class Http1AdaptorManagementTest(TestCase):
     """
-    Test Creation and deletion of HTTP1 management entities
+    Test Creation and deletion of HTTP1 management entities.
     """
     @classmethod
     def setUpClass(cls):
         super(Http1AdaptorManagementTest, cls).setUpClass()
 
+        cls.LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener'
+        cls.CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector'
+        cls.CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection'
+
+        cls.interior_edge_port = cls.tester.get_port()
+        cls.interior_mgmt_port = cls.tester.get_port()
+        cls.edge_mgmt_port = cls.tester.get_port()
+
         cls.http_server_port = cls.tester.get_port()
         cls.http_listener_port = cls.tester.get_port()
 
-        config = [
-            ('router', {'mode': 'standalone',
-                        'id': 'HTTP1MgmtTest',
-                        'allowUnsettledMulticast': 'yes'}),
+        i_config = [
+            ('router', {'mode': 'interior',
+                        'id': 'HTTP1MgmtTestInterior'}),
             ('listener', {'role': 'normal',
-                          'port': cls.tester.get_port()}),
+                          'port': cls.interior_mgmt_port}),
+            ('listener', {'role': 'edge', 'port': cls.interior_edge_port}),
             ('address', {'prefix': 'closest',   'distribution': 'closest'}),
             ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
         ]
+        config = Qdrouterd.Config(i_config)
+        cls.i_router = cls.tester.qdrouterd('HTTP1MgmtTestInterior', config, wait=False)
 
-        config = Qdrouterd.Config(config)
-        cls.router = cls.tester.qdrouterd('HTTP1MgmtTest', config, wait=True)
-
-    def test_01_mgmt(self):
-        """
-        Create and delete HTTP1 connectors and listeners
-        """
-        LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener'
-        CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector'
-        CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection'
-
-        mgmt = self.router.management
-        self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
-        self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
-
-        mgmt.create(type=CONNECTOR_TYPE,
-                    name="ServerConnector",
-                    attributes={'address': 'http1',
-                                'port': self.http_server_port,
-                                'protocolVersion': 'HTTP1'})
-
-        mgmt.create(type=LISTENER_TYPE,
-                    name="ClientListener",
-                    attributes={'address': 'http1',
-                                'port': self.http_listener_port,
-                                'protocolVersion': 'HTTP1'})
+        e_config = [
+            ('router', {'mode': 'edge',
+                        'id': 'HTTP1MgmtTestEdge'}),
+            ('listener', {'role': 'normal',
+                          'port': cls.edge_mgmt_port}),
+            ('connector', {'name': 'edge', 'role': 'edge',
+                           'port': cls.interior_edge_port}),
+            ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+        ]
+        config = Qdrouterd.Config(e_config)
+        cls.e_router = cls.tester.qdrouterd('HTTP1MgmtTestEdge', config,
+                                            wait=False)
+
+        cls.i_router.wait_ready()
+        cls.e_router.wait_ready()
+
+    def test_01_create_delete(self):
+        """ Create and delete HTTP1 connectors and listeners.  The
+        connectors/listeners are created on the edge router.  Verify that the
+        adaptor properly notifies the interior of the subscribers/producers.
+        """
+        e_mgmt = self.e_router.management
+        self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+        e_mgmt.create(type=self.CONNECTOR_TYPE,
+                      name="ServerConnector",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_server_port,
+                                  'protocolVersion': 'HTTP1'})
+
+        e_mgmt.create(type=self.LISTENER_TYPE,
+                      name="ClientListener",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_listener_port,
+                                  'protocolVersion': 'HTTP1'})
 
         # verify the entities have been created and http traffic works
 
-        self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results))
-        self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
+        self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+        self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
 
         count, error = http1_ping(sport=self.http_server_port,
                                   cport=self.http_listener_port)
         self.assertIsNone(error)
         self.assertEqual(1, count)
 
+        # now check the interior router for the closest/http1Service address
+        self.i_router.wait_address("closest/http1Service", subscribers=1)
+
         #
-        # delete the connector and wait for the associated connection to be
-        # removed
+        # delete the connector and listener; wait for the associated connection
+        # to be removed
         #
+        e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+        e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener")
+        self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
 
-        mgmt.delete(type=CONNECTOR_TYPE, name="ServerConnector")
-        self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
-
-        retry = 20  # 20 * 0.25 = 5 sec
-        hconns = 0
-        while retry:
-            obj = mgmt.query(type=CONNECTION_TYPE,
-                             attribute_names=["protocol"])
+        # will hit test timeout on failure:
+        while True:
+            hconns = 0
+            obj = e_mgmt.query(type=self.CONNECTION_TYPE,
+                               attribute_names=["protocol"])
             for item in obj.get_dicts():
                 if "http/1.x" in item["protocol"]:
                     hconns += 1
             if hconns == 0:
                 break
             sleep(0.25)
-            retry -= 1
-            hconns = 0
-
-        self.assertEqual(0, hconns, msg="HTTP connection not deleted")
 
         # When a connector is configured the router will periodically attempt
         # to connect to the server address. To prove that the connector has
@@ -137,22 +158,88 @@ class Http1AdaptorManagementTest(TestCase):
             conn, addr = s.accept()
         s.close()
 
+        # Verify that the address is no longer bound on the interior
+        self.i_router.wait_address_unsubscribed("closest/http1Service")
+
         #
-        # re-create the connector and verify it works
+        # re-create the connector and listener; verify it works
         #
-        mgmt.create(type=CONNECTOR_TYPE,
-                    name="ServerConnector",
-                    attributes={'address': 'http1',
-                                'port': self.http_server_port,
-                                'protocolVersion': 'HTTP1'})
+        e_mgmt.create(type=self.CONNECTOR_TYPE,
+                      name="ServerConnector",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_server_port,
+                                  'protocolVersion': 'HTTP1'})
+
+        e_mgmt.create(type=self.LISTENER_TYPE,
+                      name="ClientListener",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_listener_port,
+                                  'protocolVersion': 'HTTP1'})
 
-        self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
+        self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+        self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
 
         count, error = http1_ping(sport=self.http_server_port,
                                   cport=self.http_listener_port)
         self.assertIsNone(error)
         self.assertEqual(1, count)
 
+        self.i_router.wait_address("closest/http1Service", subscribers=1)
+
+        e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+        e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener")
+        self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+
+    def test_01_delete_active_connector(self):
+        """Delete an HTTP1 connector that is currently connected to a server.
+        Verify the connection is dropped.
+        """
+        e_mgmt = self.e_router.management
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+        e_mgmt.create(type=self.CONNECTOR_TYPE,
+                      name="ServerConnector",
+                      attributes={'address': 'closest/http1Service',
+                                  'port': self.http_server_port,
+                                  'protocolVersion': 'HTTP1'})
+
+        # verify the connector has been created and attach a dummy server
+        self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        server.bind(("", self.http_server_port))
+        server.setblocking(True)
+        server.settimeout(5)
+        server.listen(1)
+        conn, _ = server.accept()
+        server.close()
+
+        # now check the interior router for the closest/http1Service address
+        self.i_router.wait_address("closest/http1Service", subscribers=1)
+
+        # delete the connector
+        e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+        self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+        # expect socket to close
+        while True:
+            try:
+                rd, _, _ = select.select([conn], [], [])
+            except select.error as serror:
+                if serror[0] == errno.EINTR:
+                    print("ignoring interrupt from select(): %s" % str(serror))
+                    continue
+                raise  # assuming fatal...
+            if len(conn.recv(10)) == 0:
+                break
+
+        conn.close()
+
+        # Verify that the address is no longer bound on the interior
+        self.i_router.wait_address_unsubscribed("closest/http1Service")
+
 
 class Http1AdaptorOneRouterTest(Http1OneRouterTestBase,
                                 CommonHttp1OneRouterTest):

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org