You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/06/28 19:00:43 UTC
[qpid-dispatch] branch main updated: DISPATCH-2162: HTTP1/x adaptor
- use larger I/O buffers
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new 962da1e DISPATCH-2162: HTTP1/x adaptor - use larger I/O buffers
962da1e is described below
commit 962da1ede69f36ec5192a1e1c25e8f59b3c75949
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Jun 9 12:08:49 2021 -0400
DISPATCH-2162: HTTP1/x adaptor - use larger I/O buffers
This closes #1263
---
src/adaptors/http1/http1_adaptor.c | 261 ++++++++++++++++---------------------
src/adaptors/http1/http1_client.c | 63 ++++-----
src/adaptors/http1/http1_private.h | 47 +++----
src/adaptors/http1/http1_server.c | 55 ++++----
4 files changed, 190 insertions(+), 236 deletions(-)
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 5874ec9..bd3c189 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -138,45 +138,31 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn)
}
}
-
-void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data)
+static void _free_qdr_http1_out_data(qdr_http1_out_data_t *od)
{
- if (out_data) {
- // expect: all buffers returned from proton!
- // FIXME: not during router shutdown!
- // assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0);
- qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
- while (od) {
- DEQ_REMOVE_HEAD(out_data->fifo);
- if (od->stream_data)
- qd_message_stream_data_release(od->stream_data);
- else
- qd_buffer_list_free_buffers(&od->raw_buffers);
- free_qdr_http1_out_data_t(od);
- od = DEQ_HEAD(out_data->fifo);
- }
+ if (od) {
+ qd_iterator_free(od->data_iter);
+ if (od->stream_data)
+ qd_message_stream_data_release(od->stream_data);
+ else
+ qd_buffer_list_free_buffers(&od->raw_buffers);
+ free_qdr_http1_out_data_t(od);
}
}
-
-// Return the number of buffers in the process of being written out by the proactor.
-// These buffers are "owned" by proton - they must not be freed until proton has
-// released them.
-//
-int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t *out_data)
+void qdr_http1_out_data_cleanup(qdr_http1_out_data_list_t *out_data)
{
- int count = 0;
if (out_data) {
- qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
+ // expect: all buffers returned from proton!
+ // FIXME: not during router shutdown!
+ // assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0);
+ qdr_http1_out_data_t *od = DEQ_HEAD(*out_data);
while (od) {
- count += od->next_buffer - od->free_count;
- if (od == out_data->write_ptr)
- break;
-
- od = DEQ_NEXT(od);
+ DEQ_REMOVE_HEAD(*out_data);
+ _free_qdr_http1_out_data(od);
+ od = DEQ_HEAD(*out_data);
}
}
- return count;
}
@@ -250,86 +236,51 @@ void qdr_http1_error_response(qdr_http1_request_base_t *hreq,
//
-// Write pending data out the raw connection. Preserve order by only writing
-// the head request data.
+// Write list of data out the raw connection, freeing entries when data is exhausted
//
-uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo)
+uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_list_t *fifo)
{
- pn_raw_buffer_t buffers[RAW_BUFFER_BATCH];
size_t count = !hconn->raw_conn || pn_raw_connection_is_write_closed(hconn->raw_conn)
? 0
: pn_raw_connection_write_buffers_capacity(hconn->raw_conn);
- uint64_t total_octets = 0;
- qdr_http1_out_data_t *od = fifo->write_ptr;
- while (count > 0 && od) {
- qd_buffer_t *wbuf = 0;
- int od_len = MIN(count,
- (od->buffer_count - od->next_buffer));
- assert(od_len); // error: no data @ head?
-
- // send the out_data as a series of writes to proactor
-
- while (od_len) {
- size_t limit = MIN(RAW_BUFFER_BATCH, od_len);
- int written = 0;
-
- if (od->stream_data) { // buffers stored in qd_message_t
-
- written = qd_message_stream_data_buffers(od->stream_data, buffers, od->next_buffer, limit);
- for (int i = 0; i < written; ++i) {
- // enforce this: we expect the context can be used by the adaptor!
- assert(buffers[i].context == 0);
- buffers[i].context = (uintptr_t)od;
- total_octets += buffers[i].size;
- }
-
- } else { // list of buffers in od->raw_buffers
- // advance to next buffer to send in od
- if (!wbuf) {
- wbuf = DEQ_HEAD(od->raw_buffers);
- for (int i = 0; i < od->next_buffer; ++i)
- wbuf = DEQ_NEXT(wbuf);
- }
-
- pn_raw_buffer_t *rdisc = &buffers[0];
- while (limit--) {
- rdisc->context = (uintptr_t)od;
- rdisc->bytes = (char*) qd_buffer_base(wbuf);
- rdisc->capacity = 0;
- rdisc->size = qd_buffer_size(wbuf);
- rdisc->offset = 0;
-
- total_octets += rdisc->size;
- ++rdisc;
- wbuf = DEQ_NEXT(wbuf);
- written += 1;
- }
- }
-
- // keep me, you'll need it
- if (HTTP1_DUMP_BUFFERS) {
- for (size_t j = 0; j < written; ++j) {
- char *ptr = (char*) buffers[j].bytes;
- int len = (int) buffers[j].size;
- fprintf(stdout, "\n[C%"PRIu64"] Raw Write: Ptr=%p len=%d\n value='%.*s'\n",
- hconn->conn_id, (void*)ptr, len, len, ptr);
- fflush(stdout);
- }
- }
-
- written = pn_raw_connection_write_buffers(hconn->raw_conn, buffers, written);
- count -= written;
- od_len -= written;
- od->next_buffer += written;
+ if (hconn->write_buf_busy || count == 0)
+ return 0;
+
+ const size_t max_octets = HTTP1_IO_BUF_SIZE;
+ size_t total_octets = 0;
+ while (!DEQ_IS_EMPTY(*fifo) && total_octets < max_octets) {
+ qdr_http1_out_data_t *od = DEQ_HEAD(*fifo);
+ uint32_t data_octets = qd_iterator_remaining(od->data_iter);
+
+ size_t len = MIN(data_octets, max_octets - total_octets);
+ int copied = qd_iterator_ncopy(od->data_iter, &hconn->write_buffer[total_octets], len);
+ assert(copied == len);
+ data_octets -= copied;
+ total_octets += copied;
+
+ qd_iterator_trim_view(od->data_iter, data_octets);
+ if (qd_iterator_remaining(od->data_iter) == 0) {
+ DEQ_REMOVE_HEAD(*fifo);
+ _free_qdr_http1_out_data(od);
}
+ }
- if (od->next_buffer == od->buffer_count) {
- // all buffers in od have been passed to proton.
- od = DEQ_NEXT(od);
- fifo->write_ptr = od;
- wbuf = 0;
+ if (total_octets) {
+ pn_raw_buffer_t pn_buff = {0};
+ pn_buff.bytes = (char*) hconn->write_buffer;
+ pn_buff.size = total_octets;
+
+ // keep me, you'll need it
+ if (HTTP1_DUMP_BUFFERS) {
+ fprintf(stdout, "\n[C%"PRIu64"] Raw Write: Ptr=%p len=%"PRIu32"\n value='%.*s'\n",
+ hconn->conn_id, (void*)pn_buff.bytes, pn_buff.size,
+ (int) pn_buff.size, pn_buff.bytes);
+ fflush(stdout);
}
+
+ pn_raw_connection_write_buffers(hconn->raw_conn, &pn_buff, 1);
+ hconn->write_buf_busy = true;
}
hconn->out_http1_octets += total_octets;
@@ -340,20 +291,15 @@ uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_d
// The HTTP encoder has a list of buffers to be written to the raw connection.
// Queue it to the outgoing data fifo.
//
-void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist)
+void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, qd_buffer_list_t *blist, uintmax_t octets)
{
- int count = (int) DEQ_SIZE(*blist);
- if (count) {
+ if (octets) {
qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
ZERO(od);
- od->owning_fifo = fifo;
- od->buffer_count = (int) DEQ_SIZE(*blist);
od->raw_buffers = *blist;
+ od->data_iter = qd_iterator_buffer(DEQ_HEAD(od->raw_buffers), 0, (int)octets, ITER_VIEW_ALL);
DEQ_INIT(*blist);
-
- DEQ_INSERT_TAIL(fifo->fifo, od);
- if (!fifo->write_ptr)
- fifo->write_ptr = od;
+ DEQ_INSERT_TAIL(*fifo, od);
}
}
@@ -361,19 +307,14 @@ void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_li
// The HTTP encoder has a message body data to be written to the raw connection.
// Queue it to the outgoing data fifo.
//
-void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data)
+void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, qd_message_stream_data_t *stream_data)
{
- int count = qd_message_stream_data_buffer_count(stream_data);
- if (count) {
+ if (qd_message_stream_data_payload_length(stream_data)) {
qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
ZERO(od);
- od->owning_fifo = fifo;
od->stream_data = stream_data;
- od->buffer_count = count;
-
- DEQ_INSERT_TAIL(fifo->fifo, od);
- if (!fifo->write_ptr)
- fifo->write_ptr = od;
+ od->data_iter = qd_message_stream_data_iterator(stream_data);
+ DEQ_INSERT_TAIL(*fifo, od);
} else {
// empty body-data
qd_message_stream_data_release(stream_data);
@@ -385,39 +326,61 @@ void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_s
//
void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn)
{
- pn_raw_buffer_t buffers[RAW_BUFFER_BATCH];
- size_t count;
- while ((count = pn_raw_connection_take_written_buffers(hconn->raw_conn, buffers, RAW_BUFFER_BATCH)) != 0) {
- for (size_t i = 0; i < count; ++i) {
-
- // keep me, you'll need it
- if (HTTP1_DUMP_BUFFERS) {
- char *ptr = (char*) buffers[i].bytes;
- int len = (int) buffers[i].size;
- fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d c=%d o=%d\n value='%.*s'\n",
- hconn->conn_id, (void*)ptr, len, buffers[i].capacity, buffers[i].offset, len, ptr);
- fflush(stdout);
- }
+ pn_raw_buffer_t pn_buff = {0};
+
+ if (pn_raw_connection_take_written_buffers(hconn->raw_conn, &pn_buff, 1) != 0) {
+ assert(hconn->write_buf_busy); // expect write buffer in use
+
+ // keep me, you'll need it
+ if (HTTP1_DUMP_BUFFERS) {
+ char *ptr = (char*) pn_buff.bytes;
+ int len = (int) pn_buff.size;
+ fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d c=%d o=%d\n value='%.*s'\n",
+ hconn->conn_id, (void*)ptr, len, pn_buff.capacity, pn_buff.offset, len, ptr);
+ fflush(stdout);
+ }
- qdr_http1_out_data_t *od = (qdr_http1_out_data_t*) buffers[i].context;
- assert(od);
- // Note: according to proton devs the order in which write buffers
- // are released are NOT guaranteed to be in the same order in which
- // they were written!
-
- od->free_count += 1;
- if (od->free_count == od->buffer_count) {
- // all buffers returned
- qdr_http1_out_data_fifo_t *fifo = od->owning_fifo;
- DEQ_REMOVE(fifo->fifo, od);
- if (od->stream_data)
- qd_message_stream_data_release(od->stream_data);
- else
- qd_buffer_list_free_buffers(&od->raw_buffers);
- free_qdr_http1_out_data_t(od);
- }
+ hconn->write_buf_busy = false;
+ }
+}
+
+
+//
+// Raw Connection Read Buffer Management
+//
+
+int qdr_http1_grant_read_buffers(qdr_http1_connection_t *hconn)
+{
+ if (!hconn->read_buf_busy && hconn->raw_conn
+ && pn_raw_connection_read_buffers_capacity(hconn->raw_conn) > 0) {
+
+ pn_raw_buffer_t pn_buf = {0};
+ pn_buf.bytes = (char*) hconn->read_buffer;
+ pn_buf.capacity = HTTP1_IO_BUF_SIZE;
+ pn_raw_connection_give_read_buffers(hconn->raw_conn, &pn_buf, 1);
+ hconn->read_buf_busy = true;
+ return 1;
+ }
+ return 0;
+}
+
+// take incoming data from raw connection
+uintmax_t qdr_http1_get_read_buffers(qdr_http1_connection_t *hconn,
+ qd_buffer_list_t *blist)
+{
+ pn_raw_buffer_t pn_buff;
+ DEQ_INIT(*blist);
+ uintmax_t octets = 0;
+
+ if (hconn->raw_conn && pn_raw_connection_take_read_buffers(hconn->raw_conn,
+ &pn_buff, 1)) {
+ if (pn_buff.size) {
+ octets = pn_buff.size;
+ qd_buffer_list_append(blist, (uint8_t*) pn_buff.bytes, pn_buff.size);
}
+ hconn->read_buf_busy = false;
}
+ return octets;
}
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 204f7a7..72a31da 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -50,7 +50,7 @@ typedef struct _client_response_msg_t {
bool encoded; // true when full response encoded
// HTTP encoded message data
- qdr_http1_out_data_fifo_t out_data;
+ qdr_http1_out_data_list_t out_data;
} _client_response_msg_t;
ALLOC_DECLARE(_client_response_msg_t);
@@ -383,13 +383,13 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
{
int error = 0;
qd_buffer_list_t blist;
- uintmax_t length;
- qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+ uintmax_t length = qdr_http1_get_read_buffers(hconn, &blist);
+
if (length) {
+
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client (%zu buffers)",
hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
- hconn->in_http1_octets += length;
if (HTTP1_DUMP_BUFFERS) {
fprintf(stdout, "\nClient raw buffer READ %"PRIuMAX" total octets\n", length);
@@ -401,6 +401,7 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
fflush(stdout);
}
+ hconn->in_http1_octets += length;
error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
}
return error;
@@ -412,7 +413,7 @@ static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
{
// @TODO(kgiusti): backpressure if no credit
if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) {
- int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ int granted = qdr_http1_grant_read_buffers(hconn);
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
hconn->conn_id, granted);
}
@@ -481,11 +482,26 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
_write_pending_response((_client_request_t*) DEQ_HEAD(hconn->requests));
break;
}
+ case PN_RAW_CONNECTION_WRITTEN: {
+ qdr_http1_free_written_buffers(hconn);
+ break;
+ }
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
_handle_conn_need_read_buffers(hconn);
break;
}
+ case PN_RAW_CONNECTION_READ: {
+ if (!hconn->q2_blocked) {
+ int error = _handle_conn_read_event(hconn);
+ if (error)
+ qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
+ else
+ // room for more incoming data
+ _handle_conn_need_read_buffers(hconn);
+ }
+ break;
+ }
case PN_RAW_CONNECTION_WAKE: {
int error = 0;
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
@@ -495,7 +511,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", hconn->conn_id);
hconn->q2_blocked = false;
error = _handle_conn_read_event(hconn); // restart receiving
- _handle_conn_need_read_buffers(hconn);
+ if (!error)
+ // room for more incoming data
+ _handle_conn_need_read_buffers(hconn);
}
while (qdr_connection_process(hconn->qdr_conn)) {}
@@ -506,18 +524,6 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", hconn->conn_id);
break;
}
- case PN_RAW_CONNECTION_READ: {
- if (!hconn->q2_blocked) {
- int error = _handle_conn_read_event(hconn);
- if (error)
- qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
- }
- break;
- }
- case PN_RAW_CONNECTION_WRITTEN: {
- qdr_http1_free_written_buffers(hconn);
- break;
- }
default:
break;
}
@@ -556,7 +562,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
hconn->out_link_id, hreq->error_code, hreq->error_text);
_client_response_msg_t *rmsg = new__client_response_msg_t();
ZERO(rmsg);
- DEQ_INIT(rmsg->out_data.fifo);
+ DEQ_INIT(rmsg->out_data);
DEQ_INSERT_TAIL(hreq->responses, rmsg);
qdr_http1_error_response(&hreq->base, hreq->error_code, hreq->error_text);
_write_pending_response(hreq);
@@ -566,7 +572,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
while (rmsg &&
rmsg->dispo &&
- DEQ_IS_EMPTY(rmsg->out_data.fifo) &&
+ DEQ_IS_EMPTY(rmsg->out_data) &&
hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
// response message fully received and forwarded to client
if (rmsg->dlv) {
@@ -660,7 +666,7 @@ static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_
rmsg = DEQ_HEAD(hreq->responses);
}
assert(rmsg);
- qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist);
+ qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist, len);
// if this happens to be the current outgoing response try writing to the
// raw connection
@@ -1033,12 +1039,7 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t *adaptor,
hconn->in_link_credit += credit;
if (hconn->in_link_credit > 0) {
- if (hconn->raw_conn) {
- int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
- qd_log(adaptor->log, QD_LOG_DEBUG,
- "[C%"PRIu64"] %d read buffers granted",
- hconn->conn_id, granted);
- }
+ _handle_conn_need_read_buffers(hconn);
// is the current request message blocked by lack of credit?
@@ -1327,7 +1328,7 @@ void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t *adaptor,
// if nothing has been sent back so far
_client_response_msg_t *rmsg = new__client_response_msg_t();
ZERO(rmsg);
- DEQ_INIT(rmsg->out_data.fifo);
+ DEQ_INIT(rmsg->out_data);
DEQ_INSERT_TAIL(hreq->responses, rmsg);
if (disp == PN_REJECTED) {
@@ -1600,7 +1601,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
_client_response_msg_t *rmsg = new__client_response_msg_t();
ZERO(rmsg);
rmsg->dlv = delivery;
- DEQ_INIT(rmsg->out_data.fifo);
+ DEQ_INIT(rmsg->out_data);
qdr_delivery_set_context(delivery, hreq);
qdr_delivery_incref(delivery, "HTTP1 client referencing response delivery");
DEQ_INSERT_TAIL(hreq->responses, rmsg);
@@ -1678,7 +1679,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg)
{
DEQ_REMOVE(req->responses, rmsg);
- qdr_http1_out_data_fifo_cleanup(&rmsg->out_data);
+ qdr_http1_out_data_cleanup(&rmsg->out_data);
if (rmsg->dlv) {
qdr_delivery_set_context(rmsg->dlv, 0);
@@ -1697,7 +1698,7 @@ static void _write_pending_response(_client_request_t *hreq)
if (hreq && !hreq->cancelled) {
assert(DEQ_PREV(&hreq->base) == 0); // must preserve order
_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
- if (rmsg && rmsg->out_data.write_ptr) {
+ if (rmsg && DEQ_HEAD(rmsg->out_data)) {
uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &rmsg->out_data);
hreq->base.out_http1_octets += written;
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index f654fc5..19ffa0c 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -38,8 +38,9 @@
// for debug: will dump I/O buffer content to stdout if true
#define HTTP1_DUMP_BUFFERS false
+#define HTTP1_IO_BUF_SIZE 16384
+
typedef struct qdr_http1_out_data_t qdr_http1_out_data_t;
-typedef struct qdr_http1_out_data_fifo_t qdr_http1_out_data_fifo_t;
typedef struct qdr_http1_request_base_t qdr_http1_request_base_t;
typedef struct qdr_http1_connection_t qdr_http1_connection_t;
@@ -63,7 +64,7 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor;
// This adaptor has to cope with two different data sources: the HTTP1 encoder
// and the qd_message_stream_data_t list. The HTTP1 encoder produces a simple
// qd_buffer_list_t for outgoing header data whose ownership is given to the
-// adaptor: the adaptor is free to deque/free these buffers as needed. The
+// adaptor: the adaptor is free to dequeue/free these buffers as needed. The
// qd_message_stream_data_t buffers are shared with the owning message and the
// buffer list must not be modified by the adaptor. The qdr_http1_out_data_t
// is used to manage both types of data sources.
@@ -71,36 +72,19 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor;
struct qdr_http1_out_data_t {
DEQ_LINKS(qdr_http1_out_data_t);
- qdr_http1_out_data_fifo_t *owning_fifo;
-
// data is either in a raw buffer chain
// or a message body data (not both!)
qd_buffer_list_t raw_buffers;
qd_message_stream_data_t *stream_data;
- int buffer_count; // # total buffers
- int next_buffer; // offset to next buffer to send
- int free_count; // # buffers returned from proton
+ // points to the data contained in the stream_data/raw_buffers
+ qd_iterator_t *data_iter;
};
ALLOC_DECLARE(qdr_http1_out_data_t);
DEQ_DECLARE(qdr_http1_out_data_t, qdr_http1_out_data_list_t);
-//
-// A fifo of outgoing (raw connection) data, oldest at HEAD.
-//
-// write_ptr tracks the point in the fifo where the current out_data node that
-// is being written to the raw connection. As the raw connection returns
-// written buffers (PN_RAW_CONNECTION_WRITTEN) the are removed from the HEAD
-// and freed.
-//
-struct qdr_http1_out_data_fifo_t {
- qdr_http1_out_data_list_t fifo;
- qdr_http1_out_data_t *write_ptr;
-};
-
-
// Per HTTP request/response(s) state.
//
// This base class is extended for client and server-specific state, see
@@ -188,6 +172,14 @@ struct qdr_http1_connection_t {
// flags
//
bool trace;
+
+ //
+ //
+ bool read_buf_busy;
+ bool write_buf_busy;
+
+ uint8_t read_buffer[HTTP1_IO_BUF_SIZE];
+ uint8_t write_buffer[HTTP1_IO_BUF_SIZE];
};
ALLOC_DECLARE(qdr_http1_connection_t);
@@ -207,12 +199,13 @@ ALLOC_DECLARE(qdr_http1_connection_t);
//void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t *blist);
void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
-void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist);
-void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data);
-uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo);
-void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data);
-// return the number of buffers currently held by the proactor for writing
-int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t *out_data);
+void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, qd_buffer_list_t *blist, uintmax_t octets);
+void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, qd_message_stream_data_t *stream_data);
+uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_list_t *fifo);
+void qdr_http1_out_data_cleanup(qdr_http1_out_data_list_t *out_data);
+int qdr_http1_grant_read_buffers(qdr_http1_connection_t *hconn);
+uintmax_t qdr_http1_get_read_buffers(qdr_http1_connection_t *hconn,
+ qd_buffer_list_t *blist);
void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char *error);
void qdr_http1_connection_free(qdr_http1_connection_t *hconn);
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index bcdb3e5..bb6d252 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -74,7 +74,8 @@ typedef struct _server_request_t {
bool request_discard; // drop incoming request data
bool headers_encoded; // True when header encode done
- qdr_http1_out_data_fifo_t out_data; // encoded request written to raw conn
+ // fifo of encoded request data to be written out the raw connection:
+ qdr_http1_out_data_list_t out_data;
_server_response_msg_list_t responses; // response(s) to this request
@@ -476,24 +477,25 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
{
int error = 0;
qd_buffer_list_t blist;
- uintmax_t length;
- qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
-
- if (HTTP1_DUMP_BUFFERS) {
- fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
- qd_buffer_t *bb = DEQ_HEAD(blist);
- while (bb) {
- fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
- bb = DEQ_NEXT(bb);
- }
- fflush(stdout);
- }
+ uintmax_t length = qdr_http1_get_read_buffers(hconn, &blist);
if (length) {
+
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server (%zu buffers)",
hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
+
+ if (HTTP1_DUMP_BUFFERS) {
+ fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
+ qd_buffer_t *bb = DEQ_HEAD(blist);
+ while (bb) {
+ fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
+ bb = DEQ_NEXT(bb);
+ }
+ fflush(stdout);
+ }
+
hconn->in_http1_octets += length;
error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
}
@@ -506,7 +508,7 @@ static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
{
// @TODO(kgiusti): backpressure if no credit
// if (hconn->in_link_credit > 0 */)
- int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ int granted = qdr_http1_grant_read_buffers(hconn);
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
hconn->conn_id, granted);
}
@@ -549,7 +551,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
if (_is_request_in_progress(hreq)) {
hreq->request_discard = true;
- qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
+ qdr_http1_out_data_cleanup(&hreq->out_data);
}
pn_raw_connection_close(hconn->raw_conn);
break;
@@ -608,8 +610,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
break;
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
- if (!hconn->q2_blocked)
- _handle_conn_need_read_buffers(hconn);
+ _handle_conn_need_read_buffers(hconn);
break;
}
case PN_RAW_CONNECTION_WAKE: {
@@ -621,7 +622,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] server link unblocked from Q2 limit", hconn->conn_id);
hconn->q2_blocked = false;
error = _handle_conn_read_event(hconn); // restart receiving
- _handle_conn_need_read_buffers(hconn);
+ if (!error)
+ // room for more incoming data
+ _handle_conn_need_read_buffers(hconn);
}
while (qdr_connection_process(hconn->qdr_conn)) {}
@@ -680,11 +683,6 @@ static bool _process_request(_server_request_t *hreq)
if (hreq->cancelled) {
- // have to wait until all buffers returned from proton
- // before we can release the request
- if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
- return false;
-
// clean up the request message delivery
if (hreq->request_dlv) {
@@ -772,7 +770,7 @@ static bool _process_request(_server_request_t *hreq)
}
}
- if (hreq->request_acked && hreq->request_settled && DEQ_SIZE(hreq->out_data.fifo) == 0) {
+ if (hreq->request_acked && hreq->request_settled && DEQ_SIZE(hreq->out_data) == 0) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!",
hconn->conn_id, hreq->base.msg_id);
_server_request_free(hreq);
@@ -804,7 +802,7 @@ static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server",
hconn->conn_id, hconn->out_link_id, len);
- qdr_http1_enqueue_buffer_list(&hreq->out_data, blist);
+ qdr_http1_enqueue_buffer_list(&hreq->out_data, blist, len);
}
}
@@ -1150,8 +1148,7 @@ void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t *adaptor,
if (hconn->in_link_credit > 0) {
- if (hconn->raw_conn && !hconn->q2_blocked)
- qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ _handle_conn_need_read_buffers(hconn);
// check for pending responses that are blocked for credit
@@ -1293,7 +1290,7 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn,
hreq->base.response_addr = reply_to;
hreq->base.site = group_id;
hreq->base.start = qd_timer_now();
- DEQ_INIT(hreq->out_data.fifo);
+ DEQ_INIT(hreq->out_data);
DEQ_INIT(hreq->responses);
DEQ_INSERT_TAIL(hconn->requests, &hreq->base);
@@ -1644,7 +1641,7 @@ static void _server_request_free(_server_request_t *hreq)
{
if (hreq) {
qdr_http1_request_base_cleanup(&hreq->base);
- qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
+ qdr_http1_out_data_cleanup(&hreq->out_data);
if (hreq->request_dlv) {
qdr_delivery_set_context(hreq->request_dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 server releasing request delivery");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org