You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/10/28 18:09:52 UTC
[qpid-dispatch] 01/03: DISPATCH-2260: HTTP/1.x: fix deletion of
httpConnector and httpListener
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch 1.17.x
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 1dfc140ff6d05027da4a0fb69138f1e280cb9b41
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Mon Oct 25 15:32:43 2021 -0400
DISPATCH-2260: HTTP/1.x: fix deletion of httpConnector and httpListener
This closes #1394
---
include/qpid/dispatch/protocol_adaptor.h | 12 ++
src/adaptors/http1/http1_adaptor.c | 19 +--
src/adaptors/http1/http1_client.c | 26 +++-
src/adaptors/http1/http1_private.h | 6 +-
src/adaptors/http1/http1_server.c | 73 +++++++-----
src/router_core/connections.c | 6 +-
src/router_core/router_core_private.h | 15 +--
tests/system_tests_http1_adaptor.py | 197 ++++++++++++++++++++++---------
8 files changed, 237 insertions(+), 117 deletions(-)
diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index 654fe74..00551c2 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -323,6 +323,18 @@ void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t *adaptor
*/
typedef enum {
+ QD_CONN_OPER_UP,
+ QD_CONN_OPER_DOWN,
+} qd_conn_oper_status_t;
+
+
+typedef enum {
+ QD_CONN_ADMIN_ENABLED,
+ QD_CONN_ADMIN_DELETED
+} qd_conn_admin_status_t;
+
+
+typedef enum {
QD_LINK_ENDPOINT, ///< A link to a connected endpoint
QD_LINK_CONTROL, ///< A link to a peer router for control messages
QD_LINK_ROUTER, ///< A link to a peer router for routed messages
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 64b2a17..3828e89 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -405,7 +405,8 @@ void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
//
-// Invoked by the core thread to wake an I/O thread for the connection
+// Invoked by the core/mgmt thread to wake an I/O thread for the connection.
+// Must be thread safe.
//
static void _core_connection_activate_CT(void *context, qdr_connection_t *conn)
{
@@ -670,21 +671,23 @@ static void qd_http1_adaptor_final(void *adaptor_context)
qdr_http1_adaptor_t *adaptor = (qdr_http1_adaptor_t*) adaptor_context;
qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
+ qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections);
+ while (hconn) {
+ qdr_http1_connection_free(hconn);
+ hconn = DEQ_HEAD(adaptor->connections);
+ }
qd_http_listener_t *li = DEQ_HEAD(adaptor->listeners);
while (li) {
- qd_http1_delete_listener(0, li);
+ DEQ_REMOVE_HEAD(qdr_http1_adaptor->listeners);
+ qd_http_listener_decref(li);
li = DEQ_HEAD(adaptor->listeners);
}
qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors);
while (ct) {
- qd_http1_delete_connector(0, ct);
+ DEQ_REMOVE_HEAD(qdr_http1_adaptor->connectors);
+ qd_http_connector_decref(ct);
ct = DEQ_HEAD(adaptor->connectors);
}
- qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections);
- while (hconn) {
- qdr_http1_connection_free(hconn);
- hconn = DEQ_HEAD(adaptor->connections);
- }
sys_mutex_free(adaptor->lock);
qdr_http1_adaptor = NULL;
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 9a6b845..fa73560 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -132,6 +132,8 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li)
ZERO(hconn);
hconn->type = HTTP1_CONN_CLIENT;
+ hconn->admin_status = QD_CONN_ADMIN_ENABLED;
+ hconn->oper_status = QD_CONN_OPER_DOWN;
hconn->qd_server = li->server;
hconn->adaptor = qdr_http1_adaptor;
hconn->handler_context.handler = &_handle_connection_events;
@@ -219,8 +221,14 @@ static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void
} else {
qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
}
+
+ sys_mutex_lock(qdr_http1_adaptor->lock);
pn_listener_set_context(li->pn_listener, 0);
li->pn_listener = 0;
+ DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ qd_http_listener_decref(li);
}
break;
}
@@ -233,6 +241,9 @@ static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void
// Management Agent API - Create
//
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
+//
qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
{
qd_http_listener_t *li = qd_http_listener(qd->server, &_handle_listener_events);
@@ -256,19 +267,20 @@ qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http
// Management Agent API - Delete
//
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
+//
void qd_http1_delete_listener(qd_dispatch_t *ignore, qd_http_listener_t *li)
{
if (li) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleting HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
+ sys_mutex_lock(qdr_http1_adaptor->lock);
if (li->pn_listener) {
+ // note that the proactor may immediately schedule the
+ // PN_LISTENER_CLOSED event on another thread...
pn_listener_close(li->pn_listener);
- li->pn_listener = 0;
}
- sys_mutex_lock(qdr_http1_adaptor->lock);
- DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
sys_mutex_unlock(qdr_http1_adaptor->lock);
-
- qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
- qd_http_listener_decref(li);
}
}
@@ -317,6 +329,7 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn)
0, // bind context
0); // bind token
qdr_connection_set_context(hconn->qdr_conn, hconn);
+ hconn->oper_status = QD_CONN_OPER_UP;
qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to client created", hconn->conn_id);
@@ -460,6 +473,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
sys_mutex_unlock(qdr_http1_adaptor->lock);
// at this point the core can no longer activate this connection
+ hconn->oper_status = QD_CONN_OPER_DOWN;
if (hconn->out_link) {
qdr_link_set_context(hconn->out_link, 0);
qdr_link_detach(hconn->out_link, QD_LOST, 0);
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index 8296633..1d4a987 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -105,6 +105,7 @@ struct qdr_http1_request_base_t {
};
DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t);
+
// A single HTTP adaptor connection.
//
struct qdr_http1_connection_t {
@@ -118,6 +119,8 @@ struct qdr_http1_connection_t {
uint64_t conn_id;
qd_handler_context_t handler_context;
h1_codec_connection_type_t type;
+ qd_conn_admin_status_t admin_status;
+ qd_conn_oper_status_t oper_status;
struct {
char *host;
@@ -194,9 +197,6 @@ ALLOC_DECLARE(qdr_http1_connection_t);
// http1_adaptor.c
//
-//int qdr_http1_write_out_data(qdr_http1_connection_t *hconn);
-//void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t *blist);
-
void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, qd_buffer_list_t *blist, uintmax_t octets);
void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, qd_message_stream_data_t *stream_data);
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index c606a46..a2e93bd 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -152,6 +152,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct
ZERO(hconn);
hconn->type = HTTP1_CONN_SERVER;
+ hconn->admin_status = QD_CONN_ADMIN_ENABLED;
+ hconn->oper_status = QD_CONN_OPER_UP;
hconn->qd_server = qd->server;
hconn->adaptor = qdr_http1_adaptor;
hconn->handler_context.handler = &_handle_connection_events;
@@ -216,6 +218,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct
// Management Agent API - Create
//
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
{
qd_http_connector_t *c = qd_http_connector(qd->server);
@@ -257,6 +261,8 @@ qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_ht
// Management Agent API - Delete
//
+// Note that this runs on the Management Agent thread, which may be running concurrently with the
+// I/O and timer threads.
void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
{
if (ct) {
@@ -265,15 +271,17 @@ void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
sys_mutex_lock(qdr_http1_adaptor->lock);
DEQ_REMOVE(qdr_http1_adaptor->connectors, ct);
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) ct->ctx;
+ qdr_connection_t *qdr_conn = 0;
if (hconn) {
+ hconn->admin_status = QD_CONN_ADMIN_DELETED;
hconn->server.connector = 0;
ct->ctx = 0;
- if (hconn->qdr_conn)
- // have the core close this connection
- qdr_core_close_connection(hconn->qdr_conn);
+ qdr_conn = hconn->qdr_conn;
}
sys_mutex_unlock(qdr_http1_adaptor->lock);
+ if (qdr_conn)
+ qdr_core_close_connection(qdr_conn);
qd_http_connector_decref(ct);
}
}
@@ -435,25 +443,28 @@ static void _do_reconnect(void *context)
_process_request((_server_request_t*) DEQ_HEAD(hconn->requests));
- // Do not attempt to re-connect if the current request is still in
- // progress. This happens when the server has closed the connection before
- // the request message has fully arrived (!rx_complete).
- // qdr_connection_process() will continue to invoke the
- // qdr_http1_server_core_link_deliver callback until the request message is
- // complete.
-
- // false positive: head request is removed before it is freed, null is passed
- /* coverity[pass_freed_arg] */
- if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) {
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
- "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
- sys_mutex_lock(qdr_http1_adaptor->lock);
- hconn->raw_conn = pn_raw_connection();
- pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
- // this next call may immediately reschedule the connection on another I/O
- // thread. After this call hconn may no longer be valid!
- pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
- sys_mutex_unlock(qdr_http1_adaptor->lock);
+ if (hconn->admin_status == QD_CONN_ADMIN_ENABLED) {
+
+ // Do not attempt to re-connect if the current request is still in
+ // progress. This happens when the server has closed the connection before
+ // the request message has fully arrived (!rx_complete).
+ // qdr_connection_process() will continue to invoke the
+ // qdr_http1_server_core_link_deliver callback until the request message is
+ // complete.
+
+ // false positive: head request is removed before it is freed, null is passed
+ /* coverity[pass_freed_arg] */
+ if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ hconn->raw_conn = pn_raw_connection();
+ pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+ // this next call may immediately reschedule the connection on another I/O
+ // thread. After this call hconn may no longer be valid!
+ pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+ }
}
}
@@ -584,7 +595,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
//
bool reconnect = false;
- if (hconn->qdr_conn) {
+ if (hconn->admin_status == QD_CONN_ADMIN_ENABLED && hconn->qdr_conn) {
if (hconn->server.link_timeout == 0) {
hconn->server.link_timeout = qd_timer_now() + LINK_TIMEOUT_MSEC;
hconn->server.reconnect_pause = 0;
@@ -600,13 +611,15 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
// prevent core activation
sys_mutex_lock(qdr_http1_adaptor->lock);
hconn->raw_conn = 0;
- if (reconnect && hconn->server.reconnect_timer)
+ if (reconnect && hconn->server.reconnect_timer) {
qd_timer_schedule(hconn->server.reconnect_timer, hconn->server.reconnect_pause);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+ // do not manipulate hconn further as it may now be processed by the
+ // timer thread
+ return;
+ }
sys_mutex_unlock(qdr_http1_adaptor->lock);
-
- // do not manipulate hconn further as it may now be processed by the
- // timer thread
- return;
+ break;
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
_send_request_message((_server_request_t*) DEQ_HEAD(hconn->requests));
@@ -1722,8 +1735,10 @@ void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
sys_mutex_unlock(qdr_http1_adaptor->lock);
// the core thread can no longer activate this connection
+ hconn->oper_status = QD_CONN_OPER_DOWN;
+ _teardown_server_links(hconn);
qdr_connection_closed(qdr_conn);
- qdr_http1_close_connection(hconn, "Connection closed by management");
+ qdr_http1_close_connection(hconn, error);
// it is expected that this callback is the final callback before returning
// from qdr_connection_process(). Free hconn when qdr_connection_process returns.
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4f28d48..a5cb937 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -102,8 +102,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
conn->policy_spec = policy_spec;
conn->link_capacity = link_capacity;
conn->mask_bit = -1;
- conn->admin_status = QDR_CONN_ADMIN_ENABLED;
- conn->oper_status = QDR_CONN_OPER_UP;
+ conn->admin_status = QD_CONN_ADMIN_ENABLED;
+ conn->oper_status = QD_CONN_OPER_UP;
DEQ_INIT(conn->links);
DEQ_INIT(conn->work_list);
DEQ_INIT(conn->streaming_link_pool);
@@ -279,7 +279,7 @@ void qdr_close_connection_CT(qdr_core_t *core, qdr_connection_t *conn)
{
conn->closed = true;
conn->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Connection forced-closed by management request");
- conn->admin_status = QDR_CONN_ADMIN_DELETED;
+ conn->admin_status = QD_CONN_ADMIN_DELETED;
//Activate the connection, so the I/O threads can finish the job.
qdr_connection_activate_CT(core, conn);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c08753a..e4211ba 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -654,17 +654,6 @@ ALLOC_DECLARE(qdr_connection_info_t);
DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t);
-typedef enum {
- QDR_CONN_OPER_UP,
-} qdr_conn_oper_status_t;
-
-
-typedef enum {
- QDR_CONN_ADMIN_ENABLED,
- QDR_CONN_ADMIN_DELETED
-} qdr_conn_admin_status_t;
-
-
struct qdr_connection_t {
DEQ_LINKS(qdr_connection_t);
DEQ_LINKS_N(ACTIVATE, qdr_connection_t);
@@ -690,8 +679,8 @@ struct qdr_connection_t {
qdr_connection_info_t *connection_info;
void *user_context; /* Updated from IO thread, use work_lock */
qdr_link_route_list_t conn_link_routes; // connection scoped link routes
- qdr_conn_oper_status_t oper_status;
- qdr_conn_admin_status_t admin_status;
+ qd_conn_oper_status_t oper_status;
+ qd_conn_admin_status_t admin_status;
qdr_error_t *error;
bool closed; // This bit is used in the case where a client is trying to force close this connection.
uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index 2f2117b..986a265 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -43,85 +43,106 @@ from http1_tests import Http1CurlTestsMixIn
class Http1AdaptorManagementTest(TestCase):
"""
- Test Creation and deletion of HTTP1 management entities
+ Test Creation and deletion of HTTP1 management entities.
"""
@classmethod
def setUpClass(cls):
super(Http1AdaptorManagementTest, cls).setUpClass()
+ cls.LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener'
+ cls.CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector'
+ cls.CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection'
+
+ cls.interior_edge_port = cls.tester.get_port()
+ cls.interior_mgmt_port = cls.tester.get_port()
+ cls.edge_mgmt_port = cls.tester.get_port()
+
cls.http_server_port = cls.tester.get_port()
cls.http_listener_port = cls.tester.get_port()
- config = [
- ('router', {'mode': 'standalone',
- 'id': 'HTTP1MgmtTest',
- 'allowUnsettledMulticast': 'yes'}),
+ i_config = [
+ ('router', {'mode': 'interior',
+ 'id': 'HTTP1MgmtTestInterior'}),
('listener', {'role': 'normal',
- 'port': cls.tester.get_port()}),
+ 'port': cls.interior_mgmt_port}),
+ ('listener', {'role': 'edge', 'port': cls.interior_edge_port}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
+ config = Qdrouterd.Config(i_config)
+ cls.i_router = cls.tester.qdrouterd('HTTP1MgmtTestInterior', config, wait=False)
- config = Qdrouterd.Config(config)
- cls.router = cls.tester.qdrouterd('HTTP1MgmtTest', config, wait=True)
-
- def test_01_mgmt(self):
- """
- Create and delete HTTP1 connectors and listeners
- """
- LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener'
- CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector'
- CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection'
-
- mgmt = self.router.management
- self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
- self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
-
- mgmt.create(type=CONNECTOR_TYPE,
- name="ServerConnector",
- attributes={'address': 'http1',
- 'port': self.http_server_port,
- 'protocolVersion': 'HTTP1'})
-
- mgmt.create(type=LISTENER_TYPE,
- name="ClientListener",
- attributes={'address': 'http1',
- 'port': self.http_listener_port,
- 'protocolVersion': 'HTTP1'})
+ e_config = [
+ ('router', {'mode': 'edge',
+ 'id': 'HTTP1MgmtTestEdge'}),
+ ('listener', {'role': 'normal',
+ 'port': cls.edge_mgmt_port}),
+ ('connector', {'name': 'edge', 'role': 'edge',
+ 'port': cls.interior_edge_port}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ]
+ config = Qdrouterd.Config(e_config)
+ cls.e_router = cls.tester.qdrouterd('HTTP1MgmtTestEdge', config,
+ wait=False)
+
+ cls.i_router.wait_ready()
+ cls.e_router.wait_ready()
+
+ def test_01_create_delete(self):
+ """ Create and delete HTTP1 connectors and listeners. The
+ connectors/listeners are created on the edge router. Verify that the
+ adaptor properly notifies the interior of the subscribers/producers.
+ """
+ e_mgmt = self.e_router.management
+ self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+ self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+ e_mgmt.create(type=self.CONNECTOR_TYPE,
+ name="ServerConnector",
+ attributes={'address': 'closest/http1Service',
+ 'port': self.http_server_port,
+ 'protocolVersion': 'HTTP1'})
+
+ e_mgmt.create(type=self.LISTENER_TYPE,
+ name="ClientListener",
+ attributes={'address': 'closest/http1Service',
+ 'port': self.http_listener_port,
+ 'protocolVersion': 'HTTP1'})
# verify the entities have been created and http traffic works
- self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results))
- self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
+ self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+ self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
count, error = http1_ping(sport=self.http_server_port,
cport=self.http_listener_port)
self.assertIsNone(error)
self.assertEqual(1, count)
+ # now check the interior router for the closest/http1Service address
+ self.i_router.wait_address("closest/http1Service", subscribers=1)
+
#
- # delete the connector and wait for the associated connection to be
- # removed
+ # delete the connector and listener; wait for the associated connection
+ # to be removed
#
+ e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+ self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+ e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener")
+ self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
- mgmt.delete(type=CONNECTOR_TYPE, name="ServerConnector")
- self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
-
- retry = 20 # 20 * 0.25 = 5 sec
- hconns = 0
- while retry:
- obj = mgmt.query(type=CONNECTION_TYPE,
- attribute_names=["protocol"])
+ # will hit test timeout on failure:
+ while True:
+ hconns = 0
+ obj = e_mgmt.query(type=self.CONNECTION_TYPE,
+ attribute_names=["protocol"])
for item in obj.get_dicts():
if "http/1.x" in item["protocol"]:
hconns += 1
if hconns == 0:
break
sleep(0.25)
- retry -= 1
- hconns = 0
-
- self.assertEqual(0, hconns, msg="HTTP connection not deleted")
# When a connector is configured the router will periodically attempt
# to connect to the server address. To prove that the connector has
@@ -137,22 +158,88 @@ class Http1AdaptorManagementTest(TestCase):
conn, addr = s.accept()
s.close()
+ # Verify that the address is no longer bound on the interior
+ self.i_router.wait_address_unsubscribed("closest/http1Service")
+
#
- # re-create the connector and verify it works
+ # re-create the connector and listener; verify it works
#
- mgmt.create(type=CONNECTOR_TYPE,
- name="ServerConnector",
- attributes={'address': 'http1',
- 'port': self.http_server_port,
- 'protocolVersion': 'HTTP1'})
+ e_mgmt.create(type=self.CONNECTOR_TYPE,
+ name="ServerConnector",
+ attributes={'address': 'closest/http1Service',
+ 'port': self.http_server_port,
+ 'protocolVersion': 'HTTP1'})
+
+ e_mgmt.create(type=self.LISTENER_TYPE,
+ name="ClientListener",
+ attributes={'address': 'closest/http1Service',
+ 'port': self.http_listener_port,
+ 'protocolVersion': 'HTTP1'})
- self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
+ self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+ self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
count, error = http1_ping(sport=self.http_server_port,
cport=self.http_listener_port)
self.assertIsNone(error)
self.assertEqual(1, count)
+ self.i_router.wait_address("closest/http1Service", subscribers=1)
+
+ e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+ self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+ e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener")
+ self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results))
+
+ def test_01_delete_active_connector(self):
+ """Delete an HTTP1 connector that is currently connected to a server.
+ Verify the connection is dropped.
+ """
+ e_mgmt = self.e_router.management
+ self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+ e_mgmt.create(type=self.CONNECTOR_TYPE,
+ name="ServerConnector",
+ attributes={'address': 'closest/http1Service',
+ 'port': self.http_server_port,
+ 'protocolVersion': 'HTTP1'})
+
+ # verify the connector has been created and attach a dummy server
+ self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server.bind(("", self.http_server_port))
+ server.setblocking(True)
+ server.settimeout(5)
+ server.listen(1)
+ conn, _ = server.accept()
+ server.close()
+
+ # now check the interior router for the closest/http1Service address
+ self.i_router.wait_address("closest/http1Service", subscribers=1)
+
+ # delete the connector
+ e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector")
+ self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results))
+
+ # expect socket to close
+ while True:
+ try:
+ rd, _, _ = select.select([conn], [], [])
+ except select.error as serror:
+ if serror[0] == errno.EINTR:
+ print("ignoring interrupt from select(): %s" % str(serror))
+ continue
+ raise # assuming fatal...
+ if len(conn.recv(10)) == 0:
+ break
+
+ conn.close()
+
+ # Verify that the address is no longer bound on the interior
+ self.i_router.wait_address_unsubscribed("closest/http1Service")
+
class Http1AdaptorOneRouterTest(Http1OneRouterTestBase,
CommonHttp1OneRouterTest):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org