You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/10/29 14:31:39 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1816: prevent activation from running after reconnecting
DISPATCH-1791: avoid leaking qdr_delivery_t
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new e075b43 DISPATCH-1816: prevent activation from running after reconnecting DISPATCH-1791: avoid leaking qdr_delivery_t
e075b43 is described below
commit e075b4390fc3de2c8a3acdbf14315532165883f1
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Oct 27 13:59:22 2020 -0400
DISPATCH-1816: prevent activation from running after reconnecting
DISPATCH-1791: avoid leaking qdr_delivery_t
Move the adaptor finalize prior to stopping the core thread. This
allows the core thread to release qdr_delivery_t that were decref'ed
during the adaptor finalize code.
This closes #898
---
src/adaptors/http1/http1_adaptor.c | 36 ++--
src/adaptors/http1/http1_client.c | 25 ++-
src/adaptors/http1/http1_private.h | 7 +-
src/adaptors/http1/http1_server.c | 330 ++++++++++++++++++++++--------------
src/router_core/router_core.c | 4 +-
tests/system_tests_http1_adaptor.py | 143 ++++++++++++++++
6 files changed, 395 insertions(+), 150 deletions(-)
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index eb8349a..fe9581b 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -74,6 +74,7 @@ void qdr_http1_request_base_cleanup(qdr_http1_request_base_t *hreq)
DEQ_REMOVE(hreq->hconn->requests, hreq);
h1_codec_request_state_cancel(hreq->lib_rs);
free(hreq->response_addr);
+ free(hreq->site);
}
}
@@ -82,21 +83,24 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn)
{
if (hconn) {
pn_raw_connection_t *rconn = 0;
- qd_timer_t *t1 = 0;
- qd_timer_t *t2 = 0;
// prevent core from activating this connection while it is being torn
- // down. see _core_connection_activate_CT
+ // down. Also prevent timer callbacks from running. see
+ // _core_connection_activate_CT, and _do_reconnect/_do_activate in
+ // http1_server.c
//
sys_mutex_lock(qdr_http1_adaptor->lock);
{
DEQ_REMOVE(qdr_http1_adaptor->connections, hconn);
- t1 = hconn->server.reconnect_timer;
+ qd_timer_free(hconn->server.reconnect_timer);
hconn->server.reconnect_timer = 0;
- t2 = hconn->server.activate_timer;
+ qd_timer_free(hconn->server.activate_timer);
hconn->server.activate_timer = 0;
rconn = hconn->raw_conn;
hconn->raw_conn = 0;
+ if (hconn->qdr_conn)
+ qdr_connection_set_context(hconn->qdr_conn, 0);
+ hconn->qdr_conn = 0;
}
sys_mutex_unlock(qdr_http1_adaptor->lock);
@@ -107,9 +111,6 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn)
else
qdr_http1_client_conn_cleanup(hconn);
- qd_timer_free(t1);
- qd_timer_free(t2);
-
h1_codec_connection_free(hconn->http_conn);
if (rconn) {
pn_raw_connection_set_context(rconn, 0);
@@ -231,6 +232,7 @@ void qdr_http1_error_response(qdr_http1_request_base_t *hreq,
if (hreq->lib_rs) {
bool ignored;
h1_codec_tx_response(hreq->lib_rs, error_code, reason, 1, 1);
+ h1_codec_tx_add_header(hreq->lib_rs, "Content-Length", "0");
h1_codec_tx_done(hreq->lib_rs, &ignored);
}
}
@@ -606,22 +608,24 @@ static void _core_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t d
}
}
+
+// This is invoked by management to initiate the connection close process.
+// Once the raw conn is closed (DISCONNECT event) we call qdr_connection_closed()
+// to finish the close processing
+//
static void _core_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
{
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
if (hconn) {
+ assert(hconn->qdr_conn == conn);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"] HTTP/1.x closing connection", hconn->conn_id);
char *qdr_error = error ? qdr_error_description(error) : 0;
- qdr_http1_close_connection(hconn, qdr_error);
-
- sys_mutex_lock(qdr_http1_adaptor->lock);
- qdr_connection_set_context(conn, 0);
- sys_mutex_unlock(qdr_http1_adaptor->lock);
-
- hconn->qdr_conn = 0;
- hconn->in_link = hconn->out_link = 0;
+ if (hconn->type == HTTP1_CONN_SERVER)
+ qdr_http1_server_core_conn_close(qdr_http1_adaptor, hconn, qdr_error);
+ else
+ qdr_http1_client_core_conn_close(qdr_http1_adaptor, hconn, qdr_error);
free(qdr_error);
}
}
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 0a82599..7509f6c 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -385,15 +385,14 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
if (hconn->out_link) {
qdr_link_set_context(hconn->out_link, 0);
- qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
hconn->out_link = 0;
}
if (hconn->in_link) {
qdr_link_set_context(hconn->in_link, 0);
- qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
hconn->in_link = 0;
}
if (hconn->qdr_conn) {
+ qdr_connection_set_context(hconn->qdr_conn, 0);
qdr_connection_closed(hconn->qdr_conn);
hconn->qdr_conn = 0;
}
@@ -995,9 +994,11 @@ static bool _encode_response_headers(_client_request_t *hreq,
bool ok = false;
qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
- qd_iterator_t *group_id_itr = qd_message_field_iterator(msg, QD_FIELD_GROUP_ID);
- hreq->base.site = (char*) qd_iterator_copy(group_id_itr);
- qd_iterator_free(group_id_itr);
+ if (!hreq->base.site) {
+ qd_iterator_t *group_id_itr = qd_message_field_iterator(msg, QD_FIELD_GROUP_ID);
+ hreq->base.site = (char*) qd_iterator_copy(group_id_itr);
+ qd_iterator_free(group_id_itr);
+ }
qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
if (app_props_iter) {
@@ -1319,3 +1320,17 @@ void qdr_http1_client_conn_cleanup(qdr_http1_connection_t *hconn)
_client_request_free(hreq);
}
}
+
+
+// handle connection close request from management
+//
+void qdr_http1_client_core_conn_close(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ const char *error)
+{
+ // initiate close of the raw conn. the adaptor will call
+ // qdr_connection_close() and clean up once the DISCONNECT
+ // event is processed
+ //
+ qdr_http1_close_connection(hconn, error);
+}
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index 593803d..d19fac7 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -247,7 +247,9 @@ void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t *adaptor,
uint64_t disp,
bool settled);
void qdr_http1_client_conn_cleanup(qdr_http1_connection_t *hconn);
-
+void qdr_http1_client_core_conn_close(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ const char *error);
// http1_server.c protocol adaptor callbacks
//
void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t *adaptor,
@@ -266,6 +268,9 @@ void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t *adaptor,
uint64_t disp,
bool settled);
void qdr_http1_server_conn_cleanup(qdr_http1_connection_t *hconn);
+void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ const char *error);
// recording of stats:
void qdr_http1_record_client_request_info(qdr_http1_adaptor_t *adaptor, qdr_http1_request_base_t *request);
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 98af72a..77da28f 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -363,20 +363,65 @@ static void _teardown_server_links(qdr_http1_connection_t *hconn)
}
+//
+// A note about reconnect and activate timer handlers:
+//
+// Both _do_reconnect and _do_activate are run via separate qd_timers.
+// qd_timers execute on an arbitrary I/O thread and are guaranteed NOT to be
+// run in parallel. The _do_activate timer is started by the core thread via
+// _core_connection_activate_CT (http1_adaptor.c). The _do_reconnect timer is
+// started by the I/O thread handling the server raw connection
+// PN_RAW_CONNECTION_DISCONNECTED event.
+//
+// Since the server PN_RAW_CONNECTION_DISCONNECTED handler releases the raw
+// connection and at a later point in time _do_reconnect creates a new raw
+// connection it is guaranteed that _do_reconnect will NOT run in parallel with
+// an I/O thread running the raw connection event handler (since no such raw
+// connection exists when _do_reconnect is run)
+//
+// However it is possible to have a race between an I/O thread running
+// _do_activate and an I/O thread running the raw connection event handler IF
+// _do_activate runs _after_ _do_reconnect has run (since a new raw connection
+// is created and can be immediately scheduled).
+//
+// To avoid this race the _do_reconnect handler cancels the _do_activate timer
+// to prevent it from running immediately after _do_reconnect completes
+// (remember: timer handlers never run in parallel). To prevent the core
+// thread from rescheduling _do_activate after _do_reconnect runs a lock is
+// held by _do_reconnect while it sets hconn->raw_conn.
+//
+
+
// This adapter attempts to keep the connection to the server up as long as the
// connector is configured. This is called via a timer scheduled when the
// PN_CONNECTION_CLOSE event is handled.
+// (See above note)
//
static void _do_reconnect(void *context)
{
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
+ bool connecting = false;
+
+ // lock out core activation
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+
+ // prevent _do_activate() from trying to process the qdr_connection after
+ // we schedule the raw connection on another thread
+ if (hconn->server.activate_timer)
+ qd_timer_cancel(hconn->server.activate_timer);
if (!hconn->raw_conn) {
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Connecting to HTTP server...", hconn->conn_id);
+ connecting = true;
hconn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
// this call may reschedule the connection on another I/O thread:
pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
}
+
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ if (connecting)
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Connecting to HTTP server...", hconn->conn_id);
}
@@ -385,12 +430,19 @@ static void _do_reconnect(void *context)
// connection. If the core needs to process the qdr_connection_t when there is
// no raw connection to wake this zero-length timer handler will perform the
// connection processing (under the I/O thread).
+// (See above note)
//
static void _do_activate(void *context)
{
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
- if (hconn->qdr_conn) {
+ if (!hconn->raw_conn && hconn->qdr_conn) {
while (qdr_connection_process(hconn->qdr_conn)) {}
+ if (!hconn->qdr_conn) {
+ // the qdr_connection_t has been closed
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] HTTP/1.x server connection closed", hconn->conn_id);
+ qdr_http1_connection_free(hconn);
+ }
}
}
@@ -431,15 +483,10 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
pn_raw_connection_set_context(hconn->raw_conn, 0);
hconn->close_connection = false;
- if (!hconn->qdr_conn) {
- // the router has closed this connection so do not try to
- // re-establish it
- qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id);
- hconn->raw_conn = 0;
- qdr_http1_connection_free(hconn);
- return;
- }
+ qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id);
+ // prevent core from activating raw conn since it will no longer exist
+ // on return from the handler
sys_mutex_lock(qdr_http1_adaptor->lock);
hconn->raw_conn = 0;
sys_mutex_unlock(qdr_http1_adaptor->lock);
@@ -454,19 +501,22 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
_cancel_request(hreq);
}
- // reconnect to the server. Leave the links intact so pending requests
- // are not aborted. Once we've failed to reconnect after MAX_RECONNECT
- // tries drop the links to prevent additional request from arriving.
- //
- qd_duration_t nap_time = RETRY_PAUSE_MSEC * hconn->server.reconnect_count;
- if (hconn->server.reconnect_count == MAX_RECONNECT) {
- qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Server not responding - disconnecting...", hconn->conn_id);
- _teardown_server_links(hconn);
- } else {
- hconn->server.reconnect_count += 1; // increase next sleep interval
+ if (hconn->qdr_conn) {
+ //
+ // reconnect to the server. Leave the links intact so pending requests
+ // are not aborted. Once we've failed to reconnect after MAX_RECONNECT
+ // tries drop the links to prevent additional request from arriving.
+ //
+ qd_duration_t nap_time = RETRY_PAUSE_MSEC * hconn->server.reconnect_count;
+ if (hconn->server.reconnect_count == MAX_RECONNECT) {
+ qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Server not responding - disconnecting...", hconn->conn_id);
+ _teardown_server_links(hconn);
+ } else {
+ hconn->server.reconnect_count += 1; // increase next sleep interval
+ }
+ qd_timer_schedule(hconn->server.reconnect_timer, nap_time);
}
- qd_timer_schedule(hconn->server.reconnect_timer, nap_time);
- return;
+ break;
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", hconn->conn_id);
@@ -523,10 +573,19 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
break;
}
- // remove me:
- if (hconn) {
+ //
+ // After each event check connection and request status
+ //
+ if (!hconn->qdr_conn) {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP/1.x server connection closed", hconn->conn_id);
+ qdr_http1_connection_free(hconn);
+
+ } else {
+
+ bool need_close = false;
_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
if (hreq) {
+ // remove me:
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is server request complete????", hconn->conn_id);
qd_log(log, QD_LOG_DEBUG, " codec_completed=%s cancelled=%s",
hreq->codec_completed ? "Complete" : "Not Complete",
@@ -538,124 +597,119 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
(int) DEQ_SIZE(hreq->out_data.fifo),
qdr_http1_out_data_buffers_outstanding(&hreq->out_data),
(int) DEQ_SIZE(hreq->responses));
- }
- }
- // Check for completed or cancelled requests
+ // Check for completed or cancelled requests
- bool need_close = false;
- _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
- if (hreq) {
-
- if (hreq->cancelled) {
+ if (hreq->cancelled) {
- // request: have to wait until all buffers returned from proton
- // before we can release the request delivery...
- if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
- return;
-
- if (hreq->request_dlv) {
- // let the message drain... (TODO@(kgiusti) is this necessary?
- if (!qdr_delivery_receive_complete(hreq->request_dlv))
+ // request: have to wait until all buffers returned from proton
+ // before we can release the request delivery...
+ if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
return;
- uint64_t dispo = hreq->request_dispo ? hreq->request_dispo : PN_MODIFIED;
- qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
- hreq->request_dlv,
- dispo,
- true, // settled
- 0, // error
- 0, // dispo data
- false);
- qdr_delivery_set_context(hreq->request_dlv, 0);
- qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
- hreq->request_dlv = 0;
- }
-
- _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
- while (rmsg) {
- if (rmsg->dlv) {
- qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
- qdr_delivery_set_aborted(rmsg->dlv, true);
+ if (hreq->request_dlv) {
+ // let the message drain... (TODO@(kgiusti) is this necessary?
+ if (!qdr_delivery_receive_complete(hreq->request_dlv))
+ return;
+
+ uint64_t dispo = hreq->request_dispo ? hreq->request_dispo : PN_MODIFIED;
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ hreq->request_dlv,
+ dispo,
+ true, // settled
+ 0, // error
+ 0, // dispo data
+ false);
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
+ hreq->request_dlv = 0;
}
- _server_response_msg_free(hreq, rmsg);
- rmsg = DEQ_HEAD(hreq->responses);
- }
- // The state of the connection to the server will be unknown if
- // this request was not completed.
- if (!hreq->codec_completed && hreq->base.out_http1_octets > 0)
- need_close = true;
+ _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+ while (rmsg) {
+ if (rmsg->dlv) {
+ qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
+ qdr_delivery_set_aborted(rmsg->dlv, true);
+ }
+ _server_response_msg_free(hreq, rmsg);
+ rmsg = DEQ_HEAD(hreq->responses);
+ }
- _server_request_free(hreq);
+ // The state of the connection to the server will be unknown if
+ // this request was not completed.
+ if (!hreq->codec_completed && hreq->base.out_http1_octets > 0)
+ need_close = true;
- } else {
-
- // Can the request disposition be updated? Disposition can be
- // updated after the entire encoded request has been written to the
- // server.
- if (!hreq->request_acked &&
- hreq->request_encoded &&
- DEQ_SIZE(hreq->out_data.fifo) == 0) {
-
- qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
- hreq->request_dlv,
- hreq->request_dispo,
- false, // settled
- 0, // error
- 0, // dispo data
- false);
- hreq->request_acked = true;
- }
-
- // Can we settle request? Settle the request delivery after all
- // response messages have been received from the server
- // (codec_complete). Note that the responses may not have finished
- // being delivered to the core (lack of credit, etc.)
- //
- if (!hreq->request_settled &&
- hreq->request_acked && // implies out_data done
- hreq->codec_completed) {
-
- qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
- hreq->request_dlv,
- hreq->request_dispo,
- true, // settled
- 0, // error
- 0, // dispo data
- false);
- // can now release the delivery
- qdr_delivery_set_context(hreq->request_dlv, 0);
- qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
- hreq->request_dlv = 0;
-
- hreq->request_settled = true;
- }
+ _server_request_free(hreq);
- // Has the entire request/response completed? It is complete after
- // the request message has been settled and all responses have been
- // delivered to the core.
- //
- if (hreq->request_acked &&
- hreq->request_settled &&
- DEQ_SIZE(hreq->responses) == 0) {
+ } else {
+
+ // Can the request disposition be updated? Disposition can be
+ // updated after the entire encoded request has been written to the
+ // server.
+ if (!hreq->request_acked &&
+ hreq->request_encoded &&
+ DEQ_SIZE(hreq->out_data.fifo) == 0) {
+
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ hreq->request_dlv,
+ hreq->request_dispo,
+ false, // settled
+ 0, // error
+ 0, // dispo data
+ false);
+ hreq->request_acked = true;
+ }
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
- _server_request_free(hreq);
+ // Can we settle request? Settle the request delivery after all
+ // response messages have been received from the server
+ // (codec_complete). Note that the responses may not have finished
+ // being delivered to the core (lack of credit, etc.)
+ //
+ if (!hreq->request_settled &&
+ hreq->request_acked && // implies out_data done
+ hreq->codec_completed) {
+
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ hreq->request_dlv,
+ hreq->request_dispo,
+ true, // settled
+ 0, // error
+ 0, // dispo data
+ false);
+ // can now release the delivery
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
+ hreq->request_dlv = 0;
+
+ hreq->request_settled = true;
+ }
- // coverity ignores the fact that _server_request_free() calls
- // the base cleanup which removes hreq from hconn->requests.
- // coverity[use_after_free]
- hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
- if (hreq)
- _write_pending_request(hreq);
+ // Has the entire request/response completed? It is complete after
+ // the request message has been settled and all responses have been
+ // delivered to the core.
+ //
+ if (hreq->request_acked &&
+ hreq->request_settled &&
+ DEQ_SIZE(hreq->responses) == 0) {
+
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
+ _server_request_free(hreq);
+
+ // coverity ignores the fact that _server_request_free() calls
+ // the base cleanup which removes hreq from hconn->requests.
+ // coverity[use_after_free]
+ hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq)
+ _write_pending_request(hreq);
+ }
}
}
- }
- if (need_close) {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
- qdr_http1_close_connection(hconn, "Request cancelled");
+ if (need_close) {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
+ qdr_http1_close_connection(hconn, "Request cancelled");
+ }
}
}
@@ -1435,3 +1489,25 @@ static void _cancel_request(_server_request_t *hreq)
// cleanup occurs at the end of the connection event handler
}
+
+
+// handle connection close request from management
+//
+void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
+ qdr_http1_connection_t *hconn,
+ const char *error)
+{
+ qdr_connection_t *qdr_conn = hconn->qdr_conn;
+
+ // prevent activation by core thread
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ qdr_connection_set_context(hconn->qdr_conn, 0);
+ hconn->qdr_conn = 0;
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+ qdr_connection_closed(qdr_conn);
+ qdr_http1_close_connection(hconn, "Connection closed by management");
+
+ // it is expected that this callback is the final callback before returning
+ // from qdr_connection_process(). Free hconn when qdr_connection_process returns.
+}
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index c6dd6e8..cce7eda 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -101,6 +101,9 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area,
void qdr_core_free(qdr_core_t *core)
{
+ // have adaptors clean up all core resources
+ qdr_adaptors_finalize(core);
+
//
// Stop and join the thread
//
@@ -221,7 +224,6 @@ void qdr_core_free(qdr_core_t *core)
// at this point all the conn identifiers have been freed
qd_hash_free(core->conn_id_hash);
- qdr_adaptors_finalize(core);
qdr_modules_finalize(core);
qdr_agent_free(core->mgmt_agent);
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index 6b74e7b..3e83279 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -267,6 +267,7 @@ class ThreadedTestClient(object):
self._thread = Thread(target=self._run)
self._thread.daemon = True
self.error = None
+ self.count = 0
self._thread.start()
def _run(self):
@@ -294,6 +295,7 @@ class ThreadedTestClient(object):
% "body present!")
self.error = "error: body present!"
return
+ self.count += 1
client.close()
self._logger.log("TestClient to %s closed" % self._conn_addr)
@@ -972,6 +974,147 @@ class Http1AdaptorEdge2EdgeTest(TestCase):
server11.wait()
server10.wait()
+ def test_02_credit_replenish(self):
+ """
+ Verify credit is replenished by sending > the default credit window
+ requests across the routers. The default credit window is 250
+ """
+
+ TESTS = {
+ "GET": [
+ (RequestMsg("GET", "/GET/test_02_credit_replenish",
+ headers={"Content-Length": "000"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": "24",
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'test_02_credit_replenish'),
+ ResponseValidator(status=200),
+ ),
+ ]
+ }
+ server = TestServer(server_port=self.http_server11_port,
+ client_port=self.http_listener11_port,
+ tests=TESTS)
+ self.EA2.wait_connectors()
+
+ client = ThreadedTestClient(TESTS,
+ self.http_listener11_port,
+ repeat=300)
+ client.wait()
+ self.assertIsNone(client.error)
+ self.assertEqual(300, client.count)
+ server.wait()
+
+ def test_03_server_reconnect(self):
+ """
+ Verify server reconnect logic.
+ """
+ TESTS = {
+ "GET": [
+ (RequestMsg("GET", "/GET/test_03_server_reconnect",
+ headers={"Content-Length": "000"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": "24",
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'test_03_server_reconnect'),
+ ResponseValidator(status=200),
+ ),
+ ]
+ }
+
+ # bring up the server and send some requests. This will cause the
+ # router to grant credit for clients
+ server = TestServer(server_port=self.http_server11_port,
+ client_port=self.http_listener11_port,
+ tests=TESTS)
+ self.EA2.wait_connectors()
+
+ client = ThreadedTestClient(TESTS,
+ self.http_listener11_port,
+ repeat=2)
+ client.wait()
+ self.assertIsNone(client.error)
+ self.assertEqual(2, client.count)
+
+ # simulate server loss. Fire up a client which should be granted
+ # credit since the adaptor does not immediately teardown the server
+ # links. This will cause the adaptor to run qdr_connection_process
+ # without a raw connection available to wake the I/O thread..
+ server.wait()
+ client = ThreadedTestClient(TESTS,
+ self.http_listener11_port,
+ repeat=2)
+ # the adaptor will detach the links to the server if the connection
+ # cannot be reestablished after 2.5 seconds. Restart the server before
+ # that occurrs to prevent client messages from being released
+ server = TestServer(server_port=self.http_server11_port,
+ client_port=self.http_listener11_port,
+ tests=TESTS)
+ client.wait()
+ self.assertIsNone(client.error)
+ self.assertEqual(2, client.count)
+ server.wait()
+
+ def test_04_server_pining_for_the_fjords(self):
+ """
+ Test permanent loss of server
+ """
+ TESTS = {
+ "GET": [
+ (RequestMsg("GET", "/GET/test_04_fjord_pining",
+ headers={"Content-Length": "000"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": "20",
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'test_04_fjord_pining'),
+ ResponseValidator(status=200),
+ ),
+ ]
+ }
+
+ # bring up the server and send some requests. This will cause the
+ # router to grant credit for clients
+ server = TestServer(server_port=self.http_server11_port,
+ client_port=self.http_listener11_port,
+ tests=TESTS)
+ self.EA2.wait_connectors()
+
+ client = ThreadedTestClient(TESTS, self.http_listener11_port)
+ client.wait()
+ self.assertIsNone(client.error)
+ self.assertEqual(1, client.count)
+
+ TESTS_FAIL = {
+ "GET": [
+ (RequestMsg("GET", "/GET/test_04_fjord_pining",
+ headers={"Content-Length": "000"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Length": "20",
+ "Content-Type": "text/plain;charset=utf-8"},
+ body=b'test_04_fjord_pining'),
+ ResponseValidator(status=503),
+ ),
+ ]
+ }
+
+ # Kill the server then issue client requests, expect 503 response
+ server.wait()
+ client = ThreadedTestClient(TESTS_FAIL, self.http_listener11_port)
+ client.wait()
+ self.assertIsNone(client.error)
+ self.assertEqual(1, client.count)
+
+ # ensure links recover once the server re-appears
+ server = TestServer(server_port=self.http_server11_port,
+ client_port=self.http_listener11_port,
+ tests=TESTS)
+ self.EA2.wait_connectors()
+
+ client = ThreadedTestClient(TESTS, self.http_listener11_port)
+ client.wait()
+ self.assertIsNone(client.error)
+ self.assertEqual(1, client.count)
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org