You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/12/09 03:11:38 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1869: discard invalid messages arriving from core DISPATCH-1859:
re-work server connection cleanup
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new a11651b DISPATCH-1869: discard invalid messages arriving from core DISPATCH-1859: re-work server connection cleanup
a11651b is described below
commit a11651be8458d84cb119e4c526f12c563f3af158
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Fri Dec 4 10:35:56 2020 -0500
DISPATCH-1869: discard invalid messages arriving from core
DISPATCH-1859: re-work server connection cleanup
---
include/qpid/dispatch/http1_codec.h | 18 +-
src/adaptors/http1/http1_adaptor.c | 6 +-
src/adaptors/http1/http1_client.c | 114 ++++++-----
src/adaptors/http1/http1_codec.c | 106 +++++-----
src/adaptors/http1/http1_private.h | 1 -
src/adaptors/http1/http1_server.c | 392 ++++++++++++++++++++----------------
tests/system_tests_http1_adaptor.py | 171 +++++++++++++++-
7 files changed, 516 insertions(+), 292 deletions(-)
diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h
index 132f06c..8aac8c4 100644
--- a/include/qpid/dispatch/http1_codec.h
+++ b/include/qpid/dispatch/http1_codec.h
@@ -147,15 +147,6 @@ typedef struct h1_codec_config_t {
h1_codec_connection_t *h1_codec_connection(h1_codec_config_t *config, void *context);
void *h1_codec_connection_get_context(h1_codec_connection_t *conn);
-// Notify the codec that the endpoint closed the connection. This should be
-// called for server connections only. Once the server has reconnected it is
-// safe to resume calling h1_codec_connection_rx_data(). This method is a
-// no-op for client connections. When a client connection closes the
-// application must cancel all outstanding requests and then call
-// h1_codec_connection_free() instead.
-//
-void h1_codec_connection_closed(h1_codec_connection_t *conn);
-
// Release the codec. This can only be done after all outstanding requests
// have been completed or cancelled.
//
@@ -169,6 +160,15 @@ void h1_codec_connection_free(h1_codec_connection_t *conn);
//
int h1_codec_connection_rx_data(h1_codec_connection_t *conn, qd_buffer_list_t *data, uintmax_t len);
+// Notify the codec that the endpoint closed the connection. For server-facing
+// connections it is safe to resume calling h1_codec_connection_rx_data() for
+// the h1_codec_connection once the connection to the server is reestablished.
+// Client-facing connections cannot be resumed after the connection has been
+// closed. In the client case the application must cancel all outstanding
+// requests and then call h1_codec_connection_free() instead.
+//
+void h1_codec_connection_rx_closed(h1_codec_connection_t *conn);
+
void h1_codec_request_state_set_context(h1_codec_request_state_t *hrs, void *context);
void *h1_codec_request_state_get_context(const h1_codec_request_state_t *hrs);
h1_codec_connection_t *h1_codec_request_state_get_connection(const h1_codec_request_state_t *hrs);
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 9eb177f..931d268 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -182,11 +182,9 @@ void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char *error
"[C%"PRIu64"] Connection closing: %s", hconn->conn_id, error);
}
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
- "[C%"PRIu64"] Initiating close of connection", hconn->conn_id);
-
if (hconn->raw_conn) {
- hconn->close_connection = true;
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Initiating close of connection", hconn->conn_id);
pn_raw_connection_close(hconn->raw_conn);
}
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 031135b..33412da 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -355,10 +355,10 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
qd_log_source_t *log = qdr_http1_adaptor->log;
- qd_log(log, QD_LOG_DEBUG, "RAW CONNECTION EVENT %s\n", pn_event_type_name(pn_event_type(e)));
-
if (!hconn) return;
+ qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP client proactor event %s", hconn->conn_id, pn_event_type_name(pn_event_type(e)));
+
switch (pn_event_type(e)) {
case PN_RAW_CONNECTION_CONNECTED: {
@@ -409,7 +409,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
// @TODO(kgiusti): backpressure if no credit
- if (hconn->client.reply_to_addr && !hconn->close_connection /* && hconn->in_link_credit > 0 */) {
+ if (hconn->client.reply_to_addr /* && hconn->in_link_credit > 0 */) {
int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
hconn->conn_id, granted);
@@ -451,7 +451,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
if (hconn) {
_client_request_t *hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
if (hreq) {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is client request complete????", hconn->conn_id);
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is client request msg-id=%"PRIu64" complete????",
+ hconn->conn_id, hreq->base.msg_id);
qd_log(log, QD_LOG_DEBUG, " codec=%s req-dlv=%p resp-dlv=%d req_msg=%p %s",
hreq->codec_completed ? "Done" : "Not Done",
(void*)hreq->request_dlv,
@@ -467,6 +468,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
_client_request_t *hreq = (_client_request_t *)DEQ_HEAD(hconn->requests);
if (hreq) {
if (hreq->cancelled) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP client request msg-id=%"PRIu64" cancelled",
+ hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
need_close = true;
} else {
// Can we retire the current outgoing response messages?
@@ -477,8 +481,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
DEQ_IS_EMPTY(rmsg->out_data.fifo)) {
// response message fully received and forwarded to client
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] HTTP client settling response, dispo=0x%"PRIx64,
- hconn->conn_id, hconn->out_link_id, rmsg->dispo);
+ "[C%"PRIu64"][L%"PRIu64"] HTTP client request msg-id=%"PRIu64" settling response, dispo=0x%"PRIx64,
+ hconn->conn_id, hconn->out_link_id, hreq->base.msg_id, rmsg->dispo);
qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
rmsg->dlv,
rmsg->dispo,
@@ -495,7 +499,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
DEQ_IS_EMPTY(hreq->responses) &&
hreq->request_settled) {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!",
+ hconn->conn_id, hreq->base.msg_id);
need_close = hreq->close_on_complete;
_client_request_free(hreq);
@@ -512,8 +517,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
if (hreq->request_msg && hconn->in_link_credit > 0) {
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
- hconn->conn_id, hconn->in_link_id);
+ "[C%"PRIu64"][L%"PRIu64"] Delivering next request msg-id=%"PRIu64" to router",
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
hconn->in_link_credit -= 1;
hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
@@ -614,10 +619,6 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs,
h1_codec_connection_t *h1c = h1_codec_request_state_get_connection(hrs);
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*)h1_codec_connection_get_context(h1c);
- qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"] HTTP request received: method=%s target=%s version=%"PRIi32".%"PRIi32,
- hconn->conn_id, method, target, version_major, version_minor);
-
_client_request_t *creq = new__client_request_t();
ZERO(creq);
creq->base.start = qd_timer_now();
@@ -627,6 +628,10 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs,
creq->close_on_complete = (version_minor == 0);
DEQ_INIT(creq->responses);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"] HTTP request received: msg-id=%"PRIu64" method=%s target=%s version=%"PRIi32".%"PRIi32,
+ hconn->conn_id, creq->base.msg_id, method, target, version_major, version_minor);
+
creq->request_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
qd_compose_start_map(creq->request_props);
{
@@ -761,8 +766,8 @@ static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
hconn->in_link_credit -= 1;
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
- hconn->conn_id, hconn->in_link_id);
+ "[C%"PRIu64"][L%"PRIu64"] Delivering request msg-id=%"PRIu64" to router",
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
@@ -811,8 +816,8 @@ static void _client_rx_done_cb(h1_codec_request_state_t *hrs)
qd_message_t *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] HTTP request receive complete.",
- hconn->conn_id, hconn->in_link_id);
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" receive complete.",
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
if (!qd_message_receive_complete(msg)) {
qd_message_set_receive_complete(msg);
@@ -838,8 +843,9 @@ static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool c
uint64_t in_octets, out_octets;
h1_codec_request_state_counters(lib_rs, &in_octets, &out_octets);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"] HTTP request/response %s. Octets read: %"PRIu64" written: %"PRIu64,
+ "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" %s. Octets read: %"PRIu64" written: %"PRIu64,
hreq->base.hconn->conn_id,
+ hreq->base.msg_id,
cancelled ? "cancelled!" : "codec done",
in_octets, out_octets);
}
@@ -899,8 +905,8 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t *adaptor,
hconn->in_link_credit -= 1;
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
- hconn->conn_id, hconn->in_link_id);
+ "[C%"PRIu64"][L%"PRIu64"] Delivering next request msg-id=%"PRIu64" to router",
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
@@ -923,30 +929,37 @@ void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t *adaptor,
assert(dlv == hreq->request_dlv);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] HTTP request delivery update, outcome=0x%"PRIx64"%s",
- hconn->conn_id, hconn->in_link_id, disp, settled ? " settled" : "");
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" delivery update, outcome=0x%"PRIx64"%s",
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, disp, settled ? " settled" : "");
if (disp && disp != PN_RECEIVED && hreq->request_dispo == 0) {
// terminal disposition
hreq->request_dispo = disp;
if (disp != PN_ACCEPTED) {
- // no response message is going to arrive. Now what? For now fake
- // a response from the server by using the codec to write an error
- // response on the behalf of the server.
- qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
- "[C%"PRIu64"][L%"PRIu64"] HTTP request failure, outcome=0x%"PRIx64,
- hconn->conn_id, hconn->in_link_id, disp);
- _client_response_msg_t *rmsg = new__client_response_msg_t();
- ZERO(rmsg);
- DEQ_INIT(rmsg->out_data.fifo);
- DEQ_INSERT_TAIL(hreq->responses, rmsg);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" failure, outcome=0x%"PRIx64,
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, disp);
+
+ if (DEQ_IS_EMPTY(hreq->responses)) {
+ // best effort attempt to send an error to the client
+ // if nothing has been sent back so far
+ _client_response_msg_t *rmsg = new__client_response_msg_t();
+ ZERO(rmsg);
+ DEQ_INIT(rmsg->out_data.fifo);
+ DEQ_INSERT_TAIL(hreq->responses, rmsg);
+
+ if (disp == PN_REJECTED) {
+ qdr_http1_error_response(&hreq->base, 400, "Bad Request");
+ } else {
+ // total guess as to what the proper error code should be
+ qdr_http1_error_response(&hreq->base, 503, "Service Unavailable");
+ }
+ hreq->close_on_complete = true; // trust nothing
- if (disp == PN_REJECTED) {
- qdr_http1_error_response(&hreq->base, 400, "Bad Request");
} else {
- // total guess as to what the proper error code should be
- qdr_http1_error_response(&hreq->base, 503, "Service Unavailable");
+ // partial response already sent - punt:
+ qdr_http1_close_connection(hconn, "HTTP request failed");
}
}
}
@@ -1102,20 +1115,7 @@ static uint64_t _encode_response_message(_client_request_t *hreq,
_client_response_msg_t *rmsg)
{
qdr_http1_connection_t *hconn = hreq->base.hconn;
- qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
- qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
-
- if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
- return 0;
-
- if (status == QD_MESSAGE_DEPTH_INVALID) {
- qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
- "[C%"PRIu64"][L%"PRIu64"] body data depth check failed",
- hconn->conn_id, hconn->out_link_id);
- return PN_REJECTED;
- }
-
- assert(status == QD_MESSAGE_DEPTH_OK);
+ qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
if (!rmsg->headers_encoded) {
rmsg->headers_encoded = true;
@@ -1159,6 +1159,9 @@ static uint64_t _encode_response_message(_client_request_t *hreq,
return PN_ACCEPTED;
case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] body data need more",
+ hconn->conn_id, hconn->out_link_id);
return 0; // wait for more
case QD_MESSAGE_STREAM_DATA_INVALID:
@@ -1184,6 +1187,9 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
bool settled)
{
qd_message_t *msg = qdr_delivery_message(delivery);
+ if (qd_message_is_discard(msg))
+ return 0;
+
_client_request_t *hreq = (_client_request_t*) qdr_delivery_get_context(delivery);
if (!hreq) {
// new delivery - look for corresponding request via correlation_id
@@ -1196,6 +1202,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
"[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
hconn->conn_id, link->identity);
qd_message_set_send_complete(msg);
+ qd_message_set_discard(msg, true);
qdr_http1_close_connection(hconn, "Malformed response message");
return PN_REJECTED;
@@ -1207,6 +1214,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
qd_message_set_send_complete(msg);
+ qd_message_set_discard(msg, true);
qdr_http1_close_connection(hconn, "Cannot correlate response message");
return PN_REJECTED;
}
@@ -1219,6 +1227,9 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
qdr_delivery_set_context(delivery, hreq);
qdr_delivery_incref(delivery, "referenced by HTTP1 adaptor");
DEQ_INSERT_TAIL(hreq->responses, rmsg);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP received response for msg-id=%"PRIu64,
+ hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
break;
}
}
@@ -1238,6 +1249,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
} else {
// The response was bad. There's not much that can be done to
// recover, so for now I punt...
+ qd_message_set_discard(msg, true);
qdr_http1_close_connection(hconn, "Cannot parse response message");
}
}
@@ -1272,7 +1284,7 @@ static void _client_response_msg_free(_client_request_t *req, _client_response_m
//
static void _write_pending_response(_client_request_t *hreq)
{
- if (hreq && !hreq->cancelled && !hreq->base.hconn->close_connection) {
+ if (hreq && !hreq->cancelled) {
assert(DEQ_PREV(&hreq->base) == 0); // must preserve order
_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
if (rmsg && rmsg->out_data.write_ptr) {
diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c
index 4940618..36d7ac1 100644
--- a/src/adaptors/http1/http1_codec.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -107,6 +107,7 @@ struct h1_codec_request_state_t {
bool no_body_method; // true if request method is either HEAD or CONNECT
bool request_complete; // true when request message done encoding/decoding
bool response_complete; // true when response message done encoding/decoding
+ bool close_expected; // if true do not signal request_complete cb until closed
};
DEQ_DECLARE(h1_codec_request_state_t, h1_codec_request_state_list_t);
ALLOC_DECLARE(h1_codec_request_state_t);
@@ -239,39 +240,6 @@ h1_codec_connection_t *h1_codec_connection(h1_codec_config_t *config, void *cont
}
-// The connection has closed. If this is a connection to a server this may
-// simply be the end of the response message. Mark the in-flight response as
-// completed.
-//
-void h1_codec_connection_closed(h1_codec_connection_t *conn)
-{
- if (conn && conn->config.type == HTTP1_CONN_SERVER) {
-
- // is decoding a response in progress
- struct decoder_t *decoder = &conn->decoder;
- h1_codec_request_state_t *hrs = decoder->hrs;
- if (hrs && hrs->request_complete) {
- // the corresponding request msg is complete
- if (!hrs->response_complete) {
- hrs->response_complete = true;
- conn->config.rx_done(hrs);
- }
- conn->config.request_complete(hrs, false);
- decoder_reset(decoder);
- h1_codec_request_state_free(hrs);
- if (hrs == conn->encoder.hrs)
- encoder_reset(&conn->encoder);
- }
-
- // since the underlying connection is gone discard all remaining
- // incoming data
- decoder_reset(decoder);
- qd_buffer_list_free_buffers(&conn->decoder.incoming);
- decoder->read_ptr = NULL_I_PTR;
- }
-}
-
-
// Free the connection
//
void h1_codec_connection_free(h1_codec_connection_t *conn)
@@ -835,6 +803,17 @@ static bool process_headers_done(h1_codec_connection_t *conn, struct decoder_t *
has_body = false;
}
}
+
+ // In certain scenarios an HTTP server will close the connection to
+ // indicate the end of a response message. This may happen even if
+ // the request message has a known length (Content-Length or
+ // Transfer-Encoding). In these circumstances do NOT signal that
+ // the request is complete (call request_complete() callback) until
+ // the connection closes. Otherwise the user may start sending the
+ // next request message before the HTTP server closes the TCP
+ // connection. (see RFC7230, section Persistence)
+ hrs->close_expected = decoder->hdr_conn_close
+ || (decoder->is_http10 && !decoder->hdr_conn_keep_alive);
}
decoder->error = conn->config.rx_headers_done(decoder->hrs, has_body);
@@ -1257,25 +1236,12 @@ static bool parse_done(h1_codec_connection_t *conn, struct decoder_t *decoder)
// signal the message receive is complete
conn->config.rx_done(hrs);
- bool close_expected = false;
if (is_response) {
- // Informational 1xx response codes are NOT teriminal - further responses are allowed!
+ // Informational 1xx response codes are NOT terminal - further responses are allowed!
if (IS_INFO_RESPONSE(hrs->response_code)) {
hrs->response_code = 0;
} else {
hrs->response_complete = true;
-
- // In certain scenarios an HTTP server will close the connection to
- // indicate the end of a response message. This may happen even if
- // the request message has a known length (Content-Length or
- // Transfer-Encoding). In these circumstances do NOT signal that
- // the request is complete (call request_complete() callback) until
- // the connection closes. Otherwise the user may start sending the
- // next request message before the HTTP server closes the TCP
- // connection. (see RFC7230, section Persistence)
-
- close_expected = decoder->hdr_conn_close
- || (decoder->is_http10 && !decoder->hdr_conn_keep_alive);
}
} else {
hrs->request_complete = true;
@@ -1283,7 +1249,7 @@ static bool parse_done(h1_codec_connection_t *conn, struct decoder_t *decoder)
decoder_reset(decoder);
- if (!close_expected) {
+ if (!hrs->close_expected) {
if (hrs->request_complete && hrs->response_complete) {
conn->config.request_complete(hrs, false);
h1_codec_request_state_free(hrs);
@@ -1356,6 +1322,50 @@ int h1_codec_connection_rx_data(h1_codec_connection_t *conn, qd_buffer_list_t *d
}
+// The read channel of the connection has closed. If this is a connection to a
+// server this may simply be the end of the response message. If a message is
+// currently being decoded see if it is valid to complete.
+//
+void h1_codec_connection_rx_closed(h1_codec_connection_t *conn)
+{
+ if (conn && conn->config.type == HTTP1_CONN_SERVER) {
+
+ // terminate the in progress decode
+
+ struct decoder_t *decoder = &conn->decoder;
+ h1_codec_request_state_t *hrs = decoder->hrs;
+ if (hrs) {
+ // consider the response valid if length is unspecified since in
+ // this case the server must close the connection to complete the
+ // message body
+ if (decoder->state == HTTP1_MSG_STATE_BODY
+ && !decoder->is_chunked
+ && !decoder->hdr_content_length) {
+
+ if (!hrs->response_complete) {
+ hrs->response_complete = true;
+ conn->config.rx_done(hrs);
+ }
+ }
+ }
+
+ decoder_reset(decoder);
+ // since the underlying connection is gone discard all remaining
+ // incoming data
+ qd_buffer_list_free_buffers(&conn->decoder.incoming);
+ decoder->read_ptr = NULL_I_PTR;
+
+ // complete any "done" requests
+ hrs = DEQ_HEAD(conn->hrs_queue);
+ while (hrs && hrs->response_complete && hrs->request_complete) {
+ conn->config.request_complete(hrs, false);
+ h1_codec_request_state_free(hrs);
+ hrs = DEQ_HEAD(conn->hrs_queue);
+ }
+ }
+}
+
+
void h1_codec_request_state_set_context(h1_codec_request_state_t *hrs, void *context)
{
hrs->context = context;
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index c6580d5..e321a1c 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -183,7 +183,6 @@ struct qdr_http1_connection_t {
// flags
//
bool trace;
- bool close_connection;
};
ALLOC_DECLARE(qdr_http1_connection_t);
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 282b5d9..92ea0d1 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -69,7 +69,6 @@ typedef struct _server_request_t {
uint64_t request_dispo; // set by adaptor during encode
bool request_settled; // set by adaptor
bool request_acked; // true if dispo sent to core
- bool request_encoded; // true when encoding done
bool headers_encoded; // True when header encode done
qdr_http1_out_data_fifo_t out_data; // encoded request written to raw conn
@@ -79,6 +78,7 @@ typedef struct _server_request_t {
bool codec_completed; // Request and Response HTTP msgs OK
bool cancelled;
bool close_on_complete; // close the conn when this request is complete
+ bool response_complete; // true when server response message decoded
} _server_request_t;
ALLOC_DECLARE(_server_request_t);
ALLOC_DEFINE(_server_request_t);
@@ -125,6 +125,7 @@ static void _server_response_msg_free(_server_request_t *req, _server_response_m
static void _server_request_free(_server_request_t *hreq);
static void _write_pending_request(_server_request_t *req);
static void _cancel_request(_server_request_t *req);
+static bool _process_requests(qdr_http1_connection_t *hconn);
////////////////////////////////////////////////////////
@@ -409,6 +410,8 @@ static void _do_reconnect(void *context)
qdr_http1_connection_free(hconn);
return;
}
+
+ _process_requests(hconn);
}
// lock out core activation
@@ -432,10 +435,10 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
qd_log_source_t *log = qdr_http1_adaptor->log;
- qd_log(log, QD_LOG_DEBUG, "RAW CONNECTION EVENT %s\n", pn_event_type_name(pn_event_type(e)));
-
if (!hconn) return;
+ qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP server proactor event %s", hconn->conn_id, pn_event_type_name(pn_event_type(e)));
+
switch (pn_event_type(e)) {
case PN_RAW_CONNECTION_CONNECTED: {
@@ -447,32 +450,41 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
case PN_RAW_CONNECTION_CLOSED_READ: {
// notify the codec so it can complete the current response
// message (response body terminated on connection closed)
- h1_codec_connection_closed(hconn->http_conn);
+ h1_codec_connection_rx_closed(hconn->http_conn);
+
+ // if the response for the current request has not fully arrived cancel
+ // the request
+ _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq) {
+ if (hreq->base.out_http1_octets > 0) { // req msg written to server
+ if (!hreq->response_complete) {
+ _cancel_request(hreq);
+ }
+ }
+ }
+ pn_raw_connection_close(hconn->raw_conn);
+ break;
}
- // fall through
+
case PN_RAW_CONNECTION_CLOSED_WRITE: {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closed for %s", hconn->conn_id,
- pn_event_type(e) == PN_RAW_CONNECTION_CLOSED_READ
- ? "reading" : "writing");
+ // cancel the current request if the request has not been fully written
+ // to the raw connection
+ _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ if (hreq) {
+ if (hreq->base.out_http1_octets > 0) { // req msg written to server
+ if (!DEQ_IS_EMPTY(hreq->out_data.fifo)) {
+ _cancel_request(hreq);
+ }
+ }
+ }
pn_raw_connection_close(hconn->raw_conn);
break;
}
case PN_RAW_CONNECTION_DISCONNECTED: {
- pn_raw_connection_set_context(hconn->raw_conn, 0);
- hconn->close_connection = false;
-
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id);
-
- // if the current request was not completed, cancel it. it's ok if
- // there are outstanding *response* deliveries in flight as long as the
- // response(s) have been completely received from the server
- // (request_complete == true).
-
- _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
- if (hreq && !hreq->codec_completed && hreq->base.out_http1_octets > 0) {
- _cancel_request(hreq);
- }
+ pn_raw_connection_set_context(hconn->raw_conn, 0);
+ _process_requests(hconn);
//
// reconnect to the server. Leave the links intact so pending requests
@@ -514,11 +526,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
// @TODO(kgiusti): backpressure if no credit
// if (hconn->in_link_credit > 0 */)
- if (!hconn->close_connection) {
- int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
- hconn->conn_id, granted);
- }
+ int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+ hconn->conn_id, granted);
break;
}
case PN_RAW_CONNECTION_WAKE: {
@@ -568,136 +578,149 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qdr_http1_connection_free(hconn);
} else {
+ bool need_close = _process_requests(hconn);
- bool need_close = false;
- _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
- if (hreq) {
- // remove me:
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is server request complete????", hconn->conn_id);
- qd_log(log, QD_LOG_DEBUG, " codec_completed=%s cancelled=%s",
- hreq->codec_completed ? "Complete" : "Not Complete",
- hreq->cancelled ? "Cancelled" : "Not Cancelled");
- qd_log(log, QD_LOG_DEBUG, " Req: dlv=%p dispo=%"PRIu64" settled=%d acked=%d",
- (void*) hreq->request_dlv, hreq->request_dispo, hreq->request_settled,
- hreq->request_acked);
- qd_log(log, QD_LOG_DEBUG, " Req: out_data=%d pton=%d resp-count=%d",
- (int) DEQ_SIZE(hreq->out_data.fifo),
- qdr_http1_out_data_buffers_outstanding(&hreq->out_data),
- (int) DEQ_SIZE(hreq->responses));
-
- // Check for completed or cancelled requests
-
- if (hreq->cancelled) {
-
- // request: have to wait until all buffers returned from proton
- // before we can release the request delivery...
- if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
- return;
-
- if (hreq->request_dlv) {
- // let the message drain... (TODO@(kgiusti) is this necessary?
- if (!qdr_delivery_receive_complete(hreq->request_dlv))
- return;
-
- uint64_t dispo = hreq->request_dispo ? hreq->request_dispo : PN_MODIFIED;
- qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
- hreq->request_dlv,
- dispo,
- true, // settled
- 0, // error
- 0, // dispo data
- false);
- qdr_delivery_set_context(hreq->request_dlv, 0);
- qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
- hreq->request_dlv = 0;
- }
+ if (need_close) {
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
+ qdr_http1_close_connection(hconn, "HTTP Request requires connection close");
+ }
+ }
+}
- _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
- while (rmsg) {
- if (rmsg->dlv) {
- qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
- qdr_delivery_set_aborted(rmsg->dlv, true);
- }
- _server_response_msg_free(hreq, rmsg);
- rmsg = DEQ_HEAD(hreq->responses);
- }
- // The state of the connection to the server will be unknown if
- // this request was not completed.
- if (!hreq->codec_completed && hreq->base.out_http1_octets > 0)
- need_close = true;
+// See if the current request can be completed and the next pending request
+// started. Return true if the connection must be closed before starting the
+// next request.
+static bool _process_requests(qdr_http1_connection_t *hconn)
+{
+ bool need_close = false;
+ _server_request_t *next_hreq = 0;
+ _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
- _server_request_free(hreq);
+ if (!hreq)
+ return need_close;
- } else {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Processing current HTTP request msg-id=%"PRIu64", state=%s",
+ hconn->conn_id, hreq->base.msg_id,
+ hreq->codec_completed ? "codec complete"
+ : hreq->cancelled ? "request cancelled"
+ : "in-progress");
- // Can the request disposition be updated? Disposition can be
- // updated after the entire encoded request has been written to the
- // server.
- if (!hreq->request_acked &&
- hreq->request_encoded &&
- DEQ_SIZE(hreq->out_data.fifo) == 0) {
-
- qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
- hreq->request_dlv,
- hreq->request_dispo,
- false, // settled
- 0, // error
- 0, // dispo data
- false);
- hreq->request_acked = true;
- }
+ if (hreq->cancelled) {
- // Can we settle request? Settle the request delivery after all
- // response messages have been received from the server
- // (codec_complete). Note that the responses may not have finished
- // being delivered to the core (lack of credit, etc.)
- //
- if (!hreq->request_settled &&
- hreq->request_acked && // implies out_data done
- hreq->codec_completed) {
-
- qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
- hreq->request_dlv,
- hreq->request_dispo,
- true, // settled
- 0, // error
- 0, // dispo data
- false);
- // can now release the delivery
- qdr_delivery_set_context(hreq->request_dlv, 0);
- qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
- hreq->request_dlv = 0;
-
- hreq->request_settled = true;
- }
+ // clean up the request message delivery
+ if (hreq->request_dlv) {
+ qd_message_set_discard(qdr_delivery_message(hreq->request_dlv), true);
+
+ if (!hreq->request_acked || !hreq->request_settled) {
+
+ if (hreq->request_dispo == 0)
+ hreq->request_dispo = (hreq->base.out_http1_octets > 0
+ ? PN_MODIFIED : PN_RELEASED);
+
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ hreq->request_dlv,
+ hreq->request_dispo,
+ true, // settled
+ 0, // error
+ 0, // dispo data
+ false);
+ hreq->request_acked = hreq->request_settled = true;
+ }
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
+ hreq->request_dlv = 0;
+ }
- // Has the entire request/response completed? It is complete after
- // the request message has been settled and all responses have been
- // delivered to the core.
- //
- if (hreq->request_acked &&
- hreq->request_settled &&
- DEQ_SIZE(hreq->responses) == 0) {
-
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
- _server_request_free(hreq);
-
- // coverity ignores the fact that _server_request_free() calls
- // the base cleanup which removes hreq from hconn->requests.
- // coverity[use_after_free]
- hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
- if (hreq)
- _write_pending_request(hreq);
- }
+ // drop in flight responses
+ _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+ while (rmsg) {
+ if (rmsg->dlv) {
+ qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
+ qdr_delivery_set_aborted(rmsg->dlv, true);
}
+ _server_response_msg_free(hreq, rmsg);
+ rmsg = DEQ_HEAD(hreq->responses);
}
- if (need_close) {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
- qdr_http1_close_connection(hconn, "Request cancelled");
+ // have to wait until all buffers returned from proton
+ // before we can release the request
+ if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
+ return false;
+
+ // it is safe to keep the connection up if this request has never been
+ // written to the connection, otherwise the state of the connection is
+ // unknown so close it
+
+ if (hreq->base.out_http1_octets > 0)
+ need_close = true;
+ else
+ next_hreq = (_server_request_t*) DEQ_NEXT(&hreq->base);
+
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" cancelled",
+ hconn->conn_id, hreq->base.msg_id);
+ _server_request_free(hreq);
+
+ if (hconn->out_link)
+ qdr_link_flow(qdr_http1_adaptor->core, hconn->out_link, 1, false);
+
+
+ } else if (hreq->codec_completed) {
+
+ // The request message has been fully encoded and the response msg(s)
+ // have been completely received. The terminal disposition for the
+ // request message delivery can be set now since the server is done
+ // responding. The request disposition can be settled after all the
+ // response messages have been delivered to the core.
+
+ // hreq->out_data.fifo ==> request message written to raw conn
+ // DEQ_IS_EMPTY(hreq->responses)
+ if (!hreq->request_acked || (!hreq->request_settled
+ && DEQ_IS_EMPTY(hreq->responses))) {
+
+ assert(hreq->request_dlv);
+ assert(hreq->request_dispo == PN_ACCEPTED);
+ hreq->request_settled = DEQ_IS_EMPTY(hreq->responses);
+ qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+ hreq->request_dlv,
+ hreq->request_dispo,
+ hreq->request_settled,
+ 0, // error
+ 0, // dispo data
+ false);
+ hreq->request_acked = true;
+ if (hreq->request_settled) {
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
+ hreq->request_dlv = 0;
+ }
+ }
+
+ if (hreq->request_acked && hreq->request_settled && DEQ_SIZE(hreq->out_data.fifo) == 0) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!",
+ hconn->conn_id, hreq->base.msg_id);
+
+ if (hreq->close_on_complete)
+ need_close = true;
+ else
+ next_hreq = (_server_request_t*) DEQ_NEXT(&hreq->base);
+
+ _server_request_free(hreq);
+
+ if (hconn->out_link)
+ qdr_link_flow(qdr_http1_adaptor->core, hconn->out_link, 1, false);
}
}
+
+ if (next_hreq) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] starting new HTTP request msg-id=%"PRIu64,
+ hconn->conn_id, next_hreq->base.msg_id);
+ _write_pending_request(next_hreq);
+ }
+
+ return need_close;
}
@@ -773,8 +796,8 @@ static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
assert(hreq && hreq == (_server_request_t*) DEQ_HEAD(hconn->requests));
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] HTTP response received: status=%d phrase=%s version=%"PRIi32".%"PRIi32,
- hconn->conn_id, hconn->in_link_id, status_code, reason_phrase ? reason_phrase : "<NONE>",
+ "[C%"PRIu64"][L%"PRIu64"] HTTP msg_id=%"PRIu64" response received: status=%d phrase=%s version=%"PRIi32".%"PRIi32,
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, status_code, reason_phrase ? reason_phrase : "<NONE>",
version_major, version_minor);
_server_response_msg_t *rmsg = new__server_response_msg_t();
@@ -799,6 +822,7 @@ static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
}
}
+ hreq->response_complete = false;
return 0;
}
@@ -885,8 +909,8 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
hconn->in_link_credit -= 1;
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] Delivering response to router addr=%s",
- hconn->conn_id, hconn->in_link_id, hreq->base.response_addr);
+ "[C%"PRIu64"][L%"PRIu64"] Delivering msg-id=%"PRIu64" response to router addr=%s",
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, hreq->base.response_addr);
qd_iterator_t *addr = qd_message_field_iterator(rmsg->msg, QD_FIELD_TO);
assert(addr);
@@ -941,9 +965,10 @@ static void _server_rx_done_cb(h1_codec_request_state_t *hrs)
qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] HTTP response receive complete.",
- hconn->conn_id, hconn->in_link_id);
+ "[C%"PRIu64"][L%"PRIu64"] HTTP response message msg-id=%"PRIu64" decoding complete.",
+ hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
+ hreq->response_complete = true;
rmsg->rx_complete = true;
if (!qd_message_receive_complete(msg)) {
@@ -1108,7 +1133,6 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn,
reply_to = (char*) qd_iterator_copy(reply_to_itr);
qd_iterator_free(reply_to_itr);
- assert(reply_to && strlen(reply_to)); // remove me
if (!reply_to) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting message no reply-to.",
@@ -1132,7 +1156,7 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn,
DEQ_INSERT_TAIL(hconn->requests, &hreq->base);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg_id=%"PRIu64" reply-to=%s.",
+ "[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg-id=%"PRIu64" reply-to=%s.",
hconn->conn_id, hconn->out_link_id, msg_id, reply_to);
return hreq;
}
@@ -1271,12 +1295,13 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
if (!hreq->headers_encoded) {
uint64_t outcome = _send_request_headers(hreq, msg);
+ hreq->headers_encoded = true;
if (outcome) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
- "[C%"PRIu64"][L%"PRIu64"] Rejecting malformed message.", hconn->conn_id, hconn->out_link_id);
+ "[C%"PRIu64"][L%"PRIu64"] Rejecting malformed message msg-id=%"PRIu64,
+ hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
return outcome;
}
- hreq->headers_encoded = true;
}
qd_message_stream_data_t *stream_data = 0;
@@ -1299,17 +1324,15 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
}
case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+ qd_message_stream_data_release(stream_data);
break;
case QD_MESSAGE_STREAM_DATA_NO_MORE:
// indicate this message is complete
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
- "[C%"PRIu64"][L%"PRIu64"] request message encoding completed",
- hconn->conn_id, hconn->out_link_id);
return PN_ACCEPTED;
case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] body data need more",
hconn->conn_id, hconn->out_link_id);
return 0; // wait for more
@@ -1334,9 +1357,11 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
qdr_delivery_t *delivery,
bool settled)
{
- qd_message_t *msg = qdr_delivery_message(delivery);
- _server_request_t *hreq = (_server_request_t*) qdr_delivery_get_context(delivery);
+ qd_message_t *msg = qdr_delivery_message(delivery);
+ if (qd_message_is_discard(msg))
+ return 0;
+ _server_request_t *hreq = (_server_request_t*) qdr_delivery_get_context(delivery);
if (!hreq) {
// new delivery - create new request:
switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
@@ -1348,6 +1373,7 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
"[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
hconn->conn_id, link->identity);
qd_message_set_send_complete(msg);
+ qd_message_set_discard(msg, true);
qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
return PN_REJECTED;
@@ -1357,6 +1383,7 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
qd_message_set_send_complete(msg);
+ qd_message_set_discard(msg, true);
qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
return PN_REJECTED;
}
@@ -1368,21 +1395,28 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
}
}
- if (!hreq->request_dispo)
+ if (!hreq->request_dispo) {
hreq->request_dispo = _encode_request_message(hreq);
-
- if (hreq->request_dispo && qd_message_receive_complete(msg)) {
-
- qd_message_set_send_complete(msg);
- qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
-
- if (hreq->request_dispo == PN_ACCEPTED) {
- hreq->request_encoded = true;
- h1_codec_tx_done(hreq->base.lib_rs, &hreq->close_on_complete);
-
- } else {
- // mapping to HTTP request failed:
- _cancel_request(hreq);
+ if (hreq->request_dispo) {
+ qd_message_set_send_complete(msg);
+ if (hreq->request_dispo == PN_ACCEPTED) {
+ h1_codec_tx_done(hreq->base.lib_rs, &hreq->close_on_complete);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP request message msg-id=%"PRIu64" encoding complete",
+ hconn->conn_id, link->identity, hreq->base.msg_id);
+ } else {
+ // message invalid
+ qd_message_set_discard(msg, true);
+ _cancel_request(hreq);
+
+ // returning a terminal disposition will cause the delivery to be updated and settled,
+ // so drop our reference
+ qdr_delivery_set_context(hreq->request_dlv, 0);
+ qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "malformed HTTP1 request, delivery released");
+ hreq->request_dlv = 0;
+ hreq->request_acked = hreq->request_settled = true;
+ return hreq->request_dispo;
+ }
}
}
@@ -1435,12 +1469,13 @@ static void _server_request_free(_server_request_t *hreq)
static void _write_pending_request(_server_request_t *hreq)
{
- if (hreq && !hreq->cancelled && !hreq->base.hconn->close_connection) {
+ if (hreq && !hreq->cancelled) {
assert(DEQ_PREV(&hreq->base) == 0); // preserve order!
uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &hreq->out_data);
hreq->base.out_http1_octets += written;
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
- hreq->base.hconn->conn_id, written);
+ if (written)
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
+ hreq->base.hconn->conn_id, written);
}
}
@@ -1457,6 +1492,11 @@ void qdr_http1_server_conn_cleanup(qdr_http1_connection_t *hconn)
static void _cancel_request(_server_request_t *hreq)
{
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Cancelling HTTP Request msg-id=%"PRIu64,
+ hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id,
+ hreq->base.msg_id);
+
if (!hreq->base.lib_rs) {
// never even got to encoding it - manually mark it cancelled
hreq->cancelled = true;
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index 4d1d4f7..237adca 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -40,10 +40,11 @@ except ImportError:
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from httplib import HTTPConnection, HTTPException
+from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
from system_test import TestCase, unittest, main_module, Qdrouterd, QdManager
-from system_test import TIMEOUT, Logger
+from system_test import TIMEOUT, Logger, AsyncTestSender, AsyncTestReceiver
class RequestMsg(object):
@@ -304,7 +305,12 @@ class ThreadedTestClient(object):
op,
req.target)})
self._logger.log("TestClient getting %s response" % op)
- rsp = client.getresponse()
+ try:
+ rsp = client.getresponse()
+ except HTTPException as exc:
+ self._logger.log("TestClient response failed: %s" % exc)
+ self.error = str(exc)
+ return
self._logger.log("TestClient response %s received" % op)
if val:
try:
@@ -1472,10 +1478,11 @@ class Http1AdaptorBadEndpointsTest(TestCase):
cls.http_server_port = cls.tester.get_port()
cls.http_listener_port = cls.tester.get_port()
+ cls.http_fake_port = cls.tester.get_port()
config = [
('router', {'mode': 'standalone',
- 'id': 'TestBadEnpoints',
+ 'id': 'TestBadEndpoints',
'allowUnsettledMulticast': 'yes'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
@@ -1485,6 +1492,9 @@ class Http1AdaptorBadEndpointsTest(TestCase):
('httpListener', {'port': cls.http_listener_port,
'protocolVersion': 'HTTP1',
'address': 'testServer'}),
+ ('httpListener', {'port': cls.http_fake_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'fakeServer'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
@@ -1522,6 +1532,161 @@ class Http1AdaptorBadEndpointsTest(TestCase):
self.assertIsNone(error)
self.assertEqual(1, count)
+ def test_02_bad_request_message(self):
+ """
+ Test various improperly constructed request messages
+ """
+ server = TestServer(server_port=self.http_server_port,
+ client_port=self.http_listener_port,
+ tests={})
+
+ body_filler = "?" * 1024 * 300 # Q2
+
+ msg = Message(body="NOMSGID " + body_filler)
+ ts = AsyncTestSender(address=self.INT_A.listener,
+ target="testServer",
+ message=msg)
+ ts.wait()
+ self.assertEqual(1, ts.rejected);
+
+ msg = Message(body="NO REPLY TO " + body_filler)
+ msg.id = 1
+ ts = AsyncTestSender(address=self.INT_A.listener,
+ target="testServer",
+ message=msg)
+ ts.wait()
+ self.assertEqual(1, ts.rejected);
+
+ msg = Message(body="NO SUBJECT " + body_filler)
+ msg.id = 1
+ msg.reply_to = "amqp://fake/reply_to"
+ ts = AsyncTestSender(address=self.INT_A.listener,
+ target="testServer",
+ message=msg)
+ ts.wait()
+ self.assertEqual(1, ts.rejected);
+
+ msg = Message(body="NO APP PROPERTIES " + body_filler)
+ msg.id = 1
+ msg.reply_to = "amqp://fake/reply_to"
+ msg.subject = "GET"
+ ts = AsyncTestSender(address=self.INT_A.listener,
+ target="testServer",
+ message=msg)
+ ts.wait()
+ self.assertEqual(1, ts.rejected);
+
+ # TODO: fix body parsing (returns NEED_MORE)
+ # msg = Message(body="INVALID BODY " + body_filler)
+ # msg.id = 1
+ # msg.reply_to = "amqp://fake/reply_to"
+ # msg.subject = "GET"
+ # msg.properties = {"http:target": "/Some/target"}
+ # ts = AsyncTestSender(address=self.INT_A.listener,
+ # target="testServer",
+ # message=msg)
+ # ts.wait()
+ # self.assertEqual(1, ts.rejected);
+
+ server.wait()
+
+ # verify router is still sane:
+ count, error = http1_ping(self.http_server_port,
+ self.http_listener_port)
+ self.assertIsNone(error)
+ self.assertEqual(1, count)
+
+ def test_03_bad_response_message(self):
+ """
+ Test various improperly constructed response messages
+ """
+ DUMMY_TESTS = {
+ "GET": [
+ (RequestMsg("GET", "/GET/test_03_bad_response_message",
+ headers={"Content-Length": "000"}),
+ None,
+ None,
+ ),
+ ]
+ }
+
+ body_filler = "?" * 1024 * 300 # Q2
+
+ # fake server
+ rx = AsyncTestReceiver(self.INT_A.listener,
+ source="fakeServer")
+
+ # no correlation id:
+ client = ThreadedTestClient(DUMMY_TESTS,
+ self.http_fake_port)
+ req = rx.queue.get(timeout=TIMEOUT)
+ resp = Message(body="NO CORRELATION ID " + body_filler)
+ resp.to = req.reply_to
+ ts = AsyncTestSender(address=self.INT_A.listener,
+ target=req.reply_to,
+ message=resp)
+ ts.wait()
+ self.assertEqual(1, ts.rejected);
+ client.wait()
+ self.assertIsNotNone(client.error)
+
+ # missing application properties
+ client = ThreadedTestClient(DUMMY_TESTS,
+ self.http_fake_port)
+ req = rx.queue.get(timeout=TIMEOUT)
+
+ resp = Message(body="NO APPLICATION PROPS " + body_filler)
+ resp.to = req.reply_to
+ resp.correlation_id = req.id
+ ts = AsyncTestSender(address=self.INT_A.listener,
+ target=req.reply_to,
+ message=resp)
+ ts.wait()
+ self.assertEqual(1, ts.rejected);
+ client.wait()
+ self.assertIsNotNone(client.error)
+
+ # no status application property
+ client = ThreadedTestClient(DUMMY_TESTS,
+ self.http_fake_port)
+ req = rx.queue.get(timeout=TIMEOUT)
+ resp = Message(body="MISSING STATUS HEADER " + body_filler)
+ resp.to = req.reply_to
+ resp.correlation_id = req.id
+ resp.properties = {"stuff": "value"}
+ ts = AsyncTestSender(address=self.INT_A.listener,
+ target=req.reply_to,
+ message=resp)
+ ts.wait()
+ self.assertEqual(1, ts.rejected);
+ client.wait()
+ self.assertIsNotNone(client.error)
+
+ # TODO: fix body parsing (returns NEED_MORE)
+ # # invalid body format
+ # client = ThreadedTestClient(DUMMY_TESTS,
+ # self.http_fake_port)
+ # req = rx.queue.get(timeout=TIMEOUT)
+ # resp = Message(body="INVALID BODY FORMAT " + body_filler)
+ # resp.to = req.reply_to
+ # resp.correlation_id = req.id
+ # resp.properties = {"http:status": 200}
+ # ts = AsyncTestSender(address=self.INT_A.listener,
+ # target=req.reply_to,
+ # message=resp)
+ # ts.wait()
+ # self.assertEqual(1, ts.rejected);
+ # client.wait()
+ # self.assertIsNotNone(client.error)
+
+ rx.stop()
+
+ # verify router is still sane:
+ count, error = http1_ping(self.http_server_port,
+ self.http_listener_port)
+ self.assertIsNone(error)
+ self.assertEqual(1, count)
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org