You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/10/29 14:31:39 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1816: prevent activation from running after reconnecting DISPATCH-1791: avoid leaking qdr_delivery_t

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new e075b43  DISPATCH-1816: prevent activation from running after reconnecting DISPATCH-1791: avoid leaking qdr_delivery_t
e075b43 is described below

commit e075b4390fc3de2c8a3acdbf14315532165883f1
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Oct 27 13:59:22 2020 -0400

    DISPATCH-1816: prevent activation from running after reconnecting
    DISPATCH-1791: avoid leaking qdr_delivery_t
    
    Move the adaptor finalize prior to stopping the core thread.  This
    allows the core thread to release qdr_delivery_t that were decref'ed
    during the adaptor finalize code.
    
    This closes #898
---
 src/adaptors/http1/http1_adaptor.c  |  36 ++--
 src/adaptors/http1/http1_client.c   |  25 ++-
 src/adaptors/http1/http1_private.h  |   7 +-
 src/adaptors/http1/http1_server.c   | 330 ++++++++++++++++++++++--------------
 src/router_core/router_core.c       |   4 +-
 tests/system_tests_http1_adaptor.py | 143 ++++++++++++++++
 6 files changed, 395 insertions(+), 150 deletions(-)

diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index eb8349a..fe9581b 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -74,6 +74,7 @@ void qdr_http1_request_base_cleanup(qdr_http1_request_base_t *hreq)
         DEQ_REMOVE(hreq->hconn->requests, hreq);
         h1_codec_request_state_cancel(hreq->lib_rs);
         free(hreq->response_addr);
+        free(hreq->site);
     }
 }
 
@@ -82,21 +83,24 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn)
 {
     if (hconn) {
         pn_raw_connection_t *rconn = 0;
-        qd_timer_t *t1 = 0;
-        qd_timer_t *t2 = 0;
 
         // prevent core from activating this connection while it is being torn
-        // down.  see _core_connection_activate_CT
+        // down. Also prevent timer callbacks from running. see
+        // _core_connection_activate_CT, and _do_reconnect/_do_activate in
+        // http1_server.c
         //
         sys_mutex_lock(qdr_http1_adaptor->lock);
         {
             DEQ_REMOVE(qdr_http1_adaptor->connections, hconn);
-            t1 = hconn->server.reconnect_timer;
+            qd_timer_free(hconn->server.reconnect_timer);
             hconn->server.reconnect_timer = 0;
-            t2 = hconn->server.activate_timer;
+            qd_timer_free(hconn->server.activate_timer);
             hconn->server.activate_timer = 0;
             rconn = hconn->raw_conn;
             hconn->raw_conn = 0;
+            if (hconn->qdr_conn)
+                qdr_connection_set_context(hconn->qdr_conn, 0);
+            hconn->qdr_conn = 0;
         }
         sys_mutex_unlock(qdr_http1_adaptor->lock);
 
@@ -107,9 +111,6 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn)
         else
             qdr_http1_client_conn_cleanup(hconn);
 
-        qd_timer_free(t1);
-        qd_timer_free(t2);
-
         h1_codec_connection_free(hconn->http_conn);
         if (rconn) {
             pn_raw_connection_set_context(rconn, 0);
@@ -231,6 +232,7 @@ void qdr_http1_error_response(qdr_http1_request_base_t *hreq,
     if (hreq->lib_rs) {
         bool ignored;
         h1_codec_tx_response(hreq->lib_rs, error_code, reason, 1, 1);
+        h1_codec_tx_add_header(hreq->lib_rs, "Content-Length", "0");
         h1_codec_tx_done(hreq->lib_rs, &ignored);
     }
 }
@@ -606,22 +608,24 @@ static void _core_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t d
     }
 }
 
+
+// This is invoked by management to initiate the connection close process.
+// Once the raw conn is closed (DISCONNECT event) we call qdr_connection_closed()
+// to finish the close processing
+//
 static void _core_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
 {
     qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) qdr_connection_get_context(conn);
     if (hconn) {
+        assert(hconn->qdr_conn == conn);
         qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
                "[C%"PRIu64"] HTTP/1.x closing connection", hconn->conn_id);
 
         char *qdr_error = error ? qdr_error_description(error) : 0;
-        qdr_http1_close_connection(hconn, qdr_error);
-
-        sys_mutex_lock(qdr_http1_adaptor->lock);
-        qdr_connection_set_context(conn, 0);
-        sys_mutex_unlock(qdr_http1_adaptor->lock);
-
-        hconn->qdr_conn = 0;
-        hconn->in_link = hconn->out_link = 0;
+        if (hconn->type == HTTP1_CONN_SERVER)
+            qdr_http1_server_core_conn_close(qdr_http1_adaptor, hconn, qdr_error);
+        else
+            qdr_http1_client_core_conn_close(qdr_http1_adaptor, hconn, qdr_error);
         free(qdr_error);
     }
 }
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 0a82599..7509f6c 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -385,15 +385,14 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
 
         if (hconn->out_link) {
             qdr_link_set_context(hconn->out_link, 0);
-            qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
             hconn->out_link = 0;
         }
         if (hconn->in_link) {
             qdr_link_set_context(hconn->in_link, 0);
-            qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
             hconn->in_link = 0;
         }
         if (hconn->qdr_conn) {
+            qdr_connection_set_context(hconn->qdr_conn, 0);
             qdr_connection_closed(hconn->qdr_conn);
             hconn->qdr_conn = 0;
         }
@@ -995,9 +994,11 @@ static bool _encode_response_headers(_client_request_t *hreq,
     bool ok = false;
     qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
 
-    qd_iterator_t *group_id_itr = qd_message_field_iterator(msg, QD_FIELD_GROUP_ID);
-    hreq->base.site = (char*) qd_iterator_copy(group_id_itr);
-    qd_iterator_free(group_id_itr);
+    if (!hreq->base.site) {
+        qd_iterator_t *group_id_itr = qd_message_field_iterator(msg, QD_FIELD_GROUP_ID);
+        hreq->base.site = (char*) qd_iterator_copy(group_id_itr);
+        qd_iterator_free(group_id_itr);
+    }
 
     qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
     if (app_props_iter) {
@@ -1319,3 +1320,17 @@ void qdr_http1_client_conn_cleanup(qdr_http1_connection_t *hconn)
         _client_request_free(hreq);
     }
 }
+
+
+// handle connection close request from management
+//
+void qdr_http1_client_core_conn_close(qdr_http1_adaptor_t *adaptor,
+                                      qdr_http1_connection_t *hconn,
+                                      const char *error)
+{
+    // initiate close of the raw conn.  the adaptor will call
+    // qdr_connection_close() and clean up once the DISCONNECT
+    // event is processed
+    //
+    qdr_http1_close_connection(hconn, error);
+}
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index 593803d..d19fac7 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -247,7 +247,9 @@ void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
                                            uint64_t                  disp,
                                            bool                      settled);
 void qdr_http1_client_conn_cleanup(qdr_http1_connection_t *hconn);
-
+void qdr_http1_client_core_conn_close(qdr_http1_adaptor_t *adaptor,
+                                      qdr_http1_connection_t *hconn,
+                                      const char *error);
 // http1_server.c protocol adaptor callbacks
 //
 void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t    *adaptor,
@@ -266,6 +268,9 @@ void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
                                            uint64_t                  disp,
                                            bool                      settled);
 void qdr_http1_server_conn_cleanup(qdr_http1_connection_t *hconn);
+void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
+                                      qdr_http1_connection_t *hconn,
+                                      const char *error);
 
 // recording of stats:
 void qdr_http1_record_client_request_info(qdr_http1_adaptor_t *adaptor, qdr_http1_request_base_t *request);
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 98af72a..77da28f 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -363,20 +363,65 @@ static void _teardown_server_links(qdr_http1_connection_t *hconn)
 }
 
 
+//
+// A note about reconnect and activate timer handlers:
+//
+// Both _do_reconnect and _do_activate are run via separate qd_timers.
+// qd_timers execute on an arbitrary I/O thread and are guaranteed NOT to be
+// run in parallel.  The _do_activate timer is started by the core thread via
+// _core_connection_activate_CT (http1_adaptor.c).  The _do_reconnect timer is
+// started by the I/O thread handling the server raw connection
+// PN_RAW_CONNECTION_DISCONNECTED event.
+//
+// Since the server PN_RAW_CONNECTION_DISCONNECTED handler releases the raw
+// connection and at a later point in time _do_reconnect creates a new raw
+// connection it is guaranteed that _do_reconnect will NOT run in parallel with
+// an I/O thread running the raw connection event handler (since no such raw
+// connection exists when _do_reconnect is run)
+//
+// However it is possible to have a race between an I/O thread running
+// _do_activate and an I/O thread running the raw connection event handler IF
+// _do_activate runs _after_ _do_reconnect has run (since a new raw connection
+// is created and can be immediately scheduled).
+//
+// To avoid this race the _do_reconnect handler cancels the _do_activate timer
+// to prevent it from running immediately after _do_reconnect completes
+// (remember: timer handlers never run in parallel).  To prevent the core
+// thread from rescheduling _do_activate after _do_reconnect runs a lock is
+// held by _do_reconnect while it sets hconn->raw_conn.
+//
+
+
 // This adapter attempts to keep the connection to the server up as long as the
 // connector is configured.  This is called via a timer scheduled when the
 // PN_CONNECTION_CLOSE event is handled.
+// (See above note)
 //
 static void _do_reconnect(void *context)
 {
     qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
+    bool connecting = false;
+
+    // lock out core activation
+    sys_mutex_lock(qdr_http1_adaptor->lock);
+
+    // prevent _do_activate() from trying to process the qdr_connection after
+    // we schedule the raw connection on another thread
+    if (hconn->server.activate_timer)
+        qd_timer_cancel(hconn->server.activate_timer);
     if (!hconn->raw_conn) {
-        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Connecting to HTTP server...", hconn->conn_id);
+        connecting = true;
         hconn->raw_conn = pn_raw_connection();
         pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
         // this call may reschedule the connection on another I/O thread:
         pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
     }
+
+    sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+    if (connecting)
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] Connecting to HTTP server...", hconn->conn_id);
 }
 
 
@@ -385,12 +430,19 @@ static void _do_reconnect(void *context)
 // connection.  If the core needs to process the qdr_connection_t when there is
 // no raw connection to wake this zero-length timer handler will perform the
 // connection processing (under the I/O thread).
+// (See above note)
 //
 static void _do_activate(void *context)
 {
     qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
-    if (hconn->qdr_conn) {
+    if (!hconn->raw_conn && hconn->qdr_conn) {
         while (qdr_connection_process(hconn->qdr_conn)) {}
+        if (!hconn->qdr_conn) {
+            // the qdr_connection_t has been closed
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] HTTP/1.x server connection closed", hconn->conn_id);
+            qdr_http1_connection_free(hconn);
+        }
     }
 }
 
@@ -431,15 +483,10 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         pn_raw_connection_set_context(hconn->raw_conn, 0);
         hconn->close_connection = false;
 
-        if (!hconn->qdr_conn) {
-            // the router has closed this connection so do not try to
-            // re-establish it
-            qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id);
-            hconn->raw_conn = 0;
-            qdr_http1_connection_free(hconn);
-            return;
-        }
+        qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id);
 
+        // prevent core from activating raw conn since it will no longer exist
+        // on return from the handler
         sys_mutex_lock(qdr_http1_adaptor->lock);
         hconn->raw_conn = 0;
         sys_mutex_unlock(qdr_http1_adaptor->lock);
@@ -454,19 +501,22 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
             _cancel_request(hreq);
         }
 
-        // reconnect to the server. Leave the links intact so pending requests
-        // are not aborted.  Once we've failed to reconnect after MAX_RECONNECT
-        // tries drop the links to prevent additional request from arriving.
-        //
-        qd_duration_t nap_time = RETRY_PAUSE_MSEC * hconn->server.reconnect_count;
-        if (hconn->server.reconnect_count == MAX_RECONNECT) {
-            qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Server not responding - disconnecting...", hconn->conn_id);
-            _teardown_server_links(hconn);
-        } else {
-            hconn->server.reconnect_count += 1;  // increase next sleep interval
+        if (hconn->qdr_conn) {
+            //
+            // reconnect to the server. Leave the links intact so pending requests
+            // are not aborted.  Once we've failed to reconnect after MAX_RECONNECT
+            // tries drop the links to prevent additional request from arriving.
+            //
+            qd_duration_t nap_time = RETRY_PAUSE_MSEC * hconn->server.reconnect_count;
+            if (hconn->server.reconnect_count == MAX_RECONNECT) {
+                qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Server not responding - disconnecting...", hconn->conn_id);
+                _teardown_server_links(hconn);
+            } else {
+                hconn->server.reconnect_count += 1;  // increase next sleep interval
+            }
+            qd_timer_schedule(hconn->server.reconnect_timer, nap_time);
         }
-        qd_timer_schedule(hconn->server.reconnect_timer, nap_time);
-        return;
+        break;
     }
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", hconn->conn_id);
@@ -523,10 +573,19 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         break;
     }
 
-    // remove me:
-    if (hconn) {
+    //
+    // After each event check connection and request status
+    //
+    if (!hconn->qdr_conn) {
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP/1.x server connection closed", hconn->conn_id);
+        qdr_http1_connection_free(hconn);
+
+    } else {
+
+        bool need_close = false;
         _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
         if (hreq) {
+            // remove me:
             qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is server request complete????", hconn->conn_id);
             qd_log(log, QD_LOG_DEBUG, "   codec_completed=%s cancelled=%s",
                    hreq->codec_completed ? "Complete" : "Not Complete",
@@ -538,124 +597,119 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
                    (int) DEQ_SIZE(hreq->out_data.fifo),
                    qdr_http1_out_data_buffers_outstanding(&hreq->out_data),
                    (int) DEQ_SIZE(hreq->responses));
-        }
-    }
 
-    // Check for completed or cancelled requests
+            // Check for completed or cancelled requests
 
-    bool need_close = false;
-    _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
-    if (hreq) {
-
-        if (hreq->cancelled) {
+            if (hreq->cancelled) {
 
-            // request:  have to wait until all buffers returned from proton
-            // before we can release the request delivery...
-            if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
-                return;
-
-            if (hreq->request_dlv) {
-                // let the message drain... (TODO@(kgiusti) is this necessary?
-                if (!qdr_delivery_receive_complete(hreq->request_dlv))
+                // request:  have to wait until all buffers returned from proton
+                // before we can release the request delivery...
+                if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
                     return;
 
-                uint64_t dispo = hreq->request_dispo ? hreq->request_dispo : PN_MODIFIED;
-                qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
-                                                  hreq->request_dlv,
-                                                  dispo,
-                                                  true,   // settled
-                                                  0,      // error
-                                                  0,      // dispo data
-                                                  false);
-                qdr_delivery_set_context(hreq->request_dlv, 0);
-                qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
-                hreq->request_dlv = 0;
-            }
-
-            _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
-            while (rmsg) {
-                if (rmsg->dlv) {
-                    qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
-                    qdr_delivery_set_aborted(rmsg->dlv, true);
+                if (hreq->request_dlv) {
+                    // let the message drain... (TODO@(kgiusti) is this necessary?
+                    if (!qdr_delivery_receive_complete(hreq->request_dlv))
+                        return;
+
+                    uint64_t dispo = hreq->request_dispo ? hreq->request_dispo : PN_MODIFIED;
+                    qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                                      hreq->request_dlv,
+                                                      dispo,
+                                                      true,   // settled
+                                                      0,      // error
+                                                      0,      // dispo data
+                                                      false);
+                    qdr_delivery_set_context(hreq->request_dlv, 0);
+                    qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
+                    hreq->request_dlv = 0;
                 }
-                _server_response_msg_free(hreq, rmsg);
-                rmsg = DEQ_HEAD(hreq->responses);
-            }
 
-            // The state of the connection to the server will be unknown if
-            // this request was not completed.
-            if (!hreq->codec_completed && hreq->base.out_http1_octets > 0)
-                need_close = true;
+                _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+                while (rmsg) {
+                    if (rmsg->dlv) {
+                        qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
+                        qdr_delivery_set_aborted(rmsg->dlv, true);
+                    }
+                    _server_response_msg_free(hreq, rmsg);
+                    rmsg = DEQ_HEAD(hreq->responses);
+                }
 
-            _server_request_free(hreq);
+                // The state of the connection to the server will be unknown if
+                // this request was not completed.
+                if (!hreq->codec_completed && hreq->base.out_http1_octets > 0)
+                    need_close = true;
 
-        } else {
-
-            // Can the request disposition be updated?  Disposition can be
-            // updated after the entire encoded request has been written to the
-            // server.
-            if (!hreq->request_acked &&
-                hreq->request_encoded &&
-                DEQ_SIZE(hreq->out_data.fifo) == 0) {
-
-                qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
-                                                  hreq->request_dlv,
-                                                  hreq->request_dispo,
-                                                  false,   // settled
-                                                  0,      // error
-                                                  0,      // dispo data
-                                                  false);
-                hreq->request_acked = true;
-            }
-
-            // Can we settle request?  Settle the request delivery after all
-            // response messages have been received from the server
-            // (codec_complete).  Note that the responses may not have finished
-            // being delivered to the core (lack of credit, etc.)
-            //
-            if (!hreq->request_settled &&
-                hreq->request_acked &&  // implies out_data done
-                hreq->codec_completed) {
-
-                qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
-                                                  hreq->request_dlv,
-                                                  hreq->request_dispo,
-                                                  true,   // settled
-                                                  0,      // error
-                                                  0,      // dispo data
-                                                  false);
-                // can now release the delivery
-                qdr_delivery_set_context(hreq->request_dlv, 0);
-                qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
-                hreq->request_dlv = 0;
-
-                hreq->request_settled = true;
-            }
+                _server_request_free(hreq);
 
-            // Has the entire request/response completed?  It is complete after
-            // the request message has been settled and all responses have been
-            // delivered to the core.
-            //
-            if (hreq->request_acked &&
-                hreq->request_settled &&
-                DEQ_SIZE(hreq->responses) == 0) {
+            } else {
+
+                // Can the request disposition be updated?  Disposition can be
+                // updated after the entire encoded request has been written to the
+                // server.
+                if (!hreq->request_acked &&
+                    hreq->request_encoded &&
+                    DEQ_SIZE(hreq->out_data.fifo) == 0) {
+
+                    qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                                      hreq->request_dlv,
+                                                      hreq->request_dispo,
+                                                      false,   // settled
+                                                      0,      // error
+                                                      0,      // dispo data
+                                                      false);
+                    hreq->request_acked = true;
+                }
 
-                qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
-                _server_request_free(hreq);
+                // Can we settle request?  Settle the request delivery after all
+                // response messages have been received from the server
+                // (codec_complete).  Note that the responses may not have finished
+                // being delivered to the core (lack of credit, etc.)
+                //
+                if (!hreq->request_settled &&
+                    hreq->request_acked &&  // implies out_data done
+                    hreq->codec_completed) {
+
+                    qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                                      hreq->request_dlv,
+                                                      hreq->request_dispo,
+                                                      true,   // settled
+                                                      0,      // error
+                                                      0,      // dispo data
+                                                      false);
+                    // can now release the delivery
+                    qdr_delivery_set_context(hreq->request_dlv, 0);
+                    qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
+                    hreq->request_dlv = 0;
+
+                    hreq->request_settled = true;
+                }
 
-                // coverity ignores the fact that _server_request_free() calls
-                // the base cleanup which removes hreq from hconn->requests.
-                // coverity[use_after_free]
-                hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
-                if (hreq)
-                    _write_pending_request(hreq);
+                // Has the entire request/response completed?  It is complete after
+                // the request message has been settled and all responses have been
+                // delivered to the core.
+                //
+                if (hreq->request_acked &&
+                    hreq->request_settled &&
+                    DEQ_SIZE(hreq->responses) == 0) {
+
+                    qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
+                    _server_request_free(hreq);
+
+                    // coverity ignores the fact that _server_request_free() calls
+                    // the base cleanup which removes hreq from hconn->requests.
+                    // coverity[use_after_free]
+                    hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+                    if (hreq)
+                        _write_pending_request(hreq);
+                }
             }
         }
-    }
 
-    if (need_close) {
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
-        qdr_http1_close_connection(hconn, "Request cancelled");
+        if (need_close) {
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
+            qdr_http1_close_connection(hconn, "Request cancelled");
+        }
     }
 }
 
@@ -1435,3 +1489,25 @@ static void _cancel_request(_server_request_t *hreq)
 
     // cleanup occurs at the end of the connection event handler
 }
+
+
+// handle connection close request from management
+//
+void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
+                                      qdr_http1_connection_t *hconn,
+                                      const char *error)
+{
+    qdr_connection_t *qdr_conn = hconn->qdr_conn;
+
+    // prevent activation by core thread
+    sys_mutex_lock(qdr_http1_adaptor->lock);
+    qdr_connection_set_context(hconn->qdr_conn, 0);
+    hconn->qdr_conn = 0;
+    sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+    qdr_connection_closed(qdr_conn);
+    qdr_http1_close_connection(hconn, "Connection closed by management");
+
+    // it is expected that this callback is the final callback before returning
+    // from qdr_connection_process(). Free hconn when qdr_connection_process returns.
+}
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index c6dd6e8..cce7eda 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -101,6 +101,9 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area,
 
 void qdr_core_free(qdr_core_t *core)
 {
+    // have adaptors clean up all core resources
+    qdr_adaptors_finalize(core);
+
     //
     // Stop and join the thread
     //
@@ -221,7 +224,6 @@ void qdr_core_free(qdr_core_t *core)
     // at this point all the conn identifiers have been freed
     qd_hash_free(core->conn_id_hash);
 
-    qdr_adaptors_finalize(core);
     qdr_modules_finalize(core);
 
     qdr_agent_free(core->mgmt_agent);
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index 6b74e7b..3e83279 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -267,6 +267,7 @@ class ThreadedTestClient(object):
         self._thread = Thread(target=self._run)
         self._thread.daemon = True
         self.error = None
+        self.count = 0
         self._thread.start()
 
     def _run(self):
@@ -294,6 +295,7 @@ class ThreadedTestClient(object):
                                              % "body present!")
                             self.error = "error: body present!"
                             return
+                    self.count += 1
         client.close()
         self._logger.log("TestClient to %s closed" % self._conn_addr)
 
@@ -972,6 +974,147 @@ class Http1AdaptorEdge2EdgeTest(TestCase):
         server11.wait()
         server10.wait()
 
+    def test_02_credit_replenish(self):
+        """
+        Verify credit is replenished by sending > the default credit window
+        requests across the routers.  The default credit window is 250
+        """
+
+        TESTS = {
+            "GET": [
+                (RequestMsg("GET", "/GET/test_02_credit_replenish",
+                            headers={"Content-Length": "000"}),
+                 ResponseMsg(200, reason="OK",
+                             headers={"Content-Length": "24",
+                                      "Content-Type": "text/plain;charset=utf-8"},
+                             body=b'test_02_credit_replenish'),
+                 ResponseValidator(status=200),
+                ),
+            ]
+        }
+        server = TestServer(server_port=self.http_server11_port,
+                            client_port=self.http_listener11_port,
+                            tests=TESTS)
+        self.EA2.wait_connectors()
+
+        client = ThreadedTestClient(TESTS,
+                                    self.http_listener11_port,
+                                    repeat=300)
+        client.wait()
+        self.assertIsNone(client.error)
+        self.assertEqual(300, client.count)
+        server.wait()
+
+    def test_03_server_reconnect(self):
+        """
+        Verify server reconnect logic.
+        """
+        TESTS = {
+            "GET": [
+                (RequestMsg("GET", "/GET/test_03_server_reconnect",
+                            headers={"Content-Length": "000"}),
+                 ResponseMsg(200, reason="OK",
+                             headers={"Content-Length": "24",
+                                      "Content-Type": "text/plain;charset=utf-8"},
+                             body=b'test_03_server_reconnect'),
+                 ResponseValidator(status=200),
+                ),
+            ]
+        }
+
+        # bring up the server and send some requests. This will cause the
+        # router to grant credit for clients
+        server = TestServer(server_port=self.http_server11_port,
+                            client_port=self.http_listener11_port,
+                            tests=TESTS)
+        self.EA2.wait_connectors()
+
+        client = ThreadedTestClient(TESTS,
+                                    self.http_listener11_port,
+                                    repeat=2)
+        client.wait()
+        self.assertIsNone(client.error)
+        self.assertEqual(2, client.count)
+
+        # simulate server loss.  Fire up a client which should be granted
+        # credit since the adaptor does not immediately teardown the server
+        # links.  This will cause the adaptor to run qdr_connection_process
+        # without a raw connection available to wake the I/O thread..
+        server.wait()
+        client = ThreadedTestClient(TESTS,
+                                    self.http_listener11_port,
+                                    repeat=2)
+        # the adaptor will detach the links to the server if the connection
+        # cannot be reestablished after 2.5 seconds.  Restart the server before
+        # that occurrs to prevent client messages from being released
+        server = TestServer(server_port=self.http_server11_port,
+                            client_port=self.http_listener11_port,
+                            tests=TESTS)
+        client.wait()
+        self.assertIsNone(client.error)
+        self.assertEqual(2, client.count)
+        server.wait()
+
+    def test_04_server_pining_for_the_fjords(self):
+        """
+        Test permanent loss of server
+        """
+        TESTS = {
+            "GET": [
+                (RequestMsg("GET", "/GET/test_04_fjord_pining",
+                            headers={"Content-Length": "000"}),
+                 ResponseMsg(200, reason="OK",
+                             headers={"Content-Length": "20",
+                                      "Content-Type": "text/plain;charset=utf-8"},
+                             body=b'test_04_fjord_pining'),
+                 ResponseValidator(status=200),
+                ),
+            ]
+        }
+
+        # bring up the server and send some requests. This will cause the
+        # router to grant credit for clients
+        server = TestServer(server_port=self.http_server11_port,
+                            client_port=self.http_listener11_port,
+                            tests=TESTS)
+        self.EA2.wait_connectors()
+
+        client = ThreadedTestClient(TESTS, self.http_listener11_port)
+        client.wait()
+        self.assertIsNone(client.error)
+        self.assertEqual(1, client.count)
+
+        TESTS_FAIL = {
+            "GET": [
+                (RequestMsg("GET", "/GET/test_04_fjord_pining",
+                            headers={"Content-Length": "000"}),
+                 ResponseMsg(200, reason="OK",
+                             headers={"Content-Length": "20",
+                                      "Content-Type": "text/plain;charset=utf-8"},
+                             body=b'test_04_fjord_pining'),
+                 ResponseValidator(status=503),
+                ),
+            ]
+        }
+
+        # Kill the server then issue client requests, expect 503 response
+        server.wait()
+        client = ThreadedTestClient(TESTS_FAIL, self.http_listener11_port)
+        client.wait()
+        self.assertIsNone(client.error)
+        self.assertEqual(1, client.count)
+
+        # ensure links recover once the server re-appears
+        server = TestServer(server_port=self.http_server11_port,
+                            client_port=self.http_listener11_port,
+                            tests=TESTS)
+        self.EA2.wait_connectors()
+
+        client = ThreadedTestClient(TESTS, self.http_listener11_port)
+        client.wait()
+        self.assertIsNone(client.error)
+        self.assertEqual(1, client.count)
+
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org