You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/10/19 18:38:15 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1744: fix input body parsing to avoid inserting empty buffers

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new 14ceb76  DISPATCH-1744: fix input body parsing to avoid inserting empty buffers
14ceb76 is described below

commit 14ceb7687c8a20c932a3430d575df38301a1be52
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Mon Oct 19 13:22:54 2020 -0400

    DISPATCH-1744: fix input body parsing to avoid inserting empty buffers
---
 include/qpid/dispatch/http1_codec.h |  2 +-
 src/adaptors/http1/http1_client.c   | 14 +-----
 src/adaptors/http1/http1_codec.c    | 86 ++++++++++++++++++-------------------
 src/adaptors/http1/http1_server.c   | 14 +-----
 4 files changed, 48 insertions(+), 68 deletions(-)

diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h
index ccb8864..c1e384d 100644
--- a/include/qpid/dispatch/http1_codec.h
+++ b/include/qpid/dispatch/http1_codec.h
@@ -127,7 +127,7 @@ typedef struct h1_codec_config_t {
     int (*rx_header)(h1_codec_request_state_t *hrs, const char *key, const char *value);
     int (*rx_headers_done)(h1_codec_request_state_t *hrs, bool has_body);
 
-    int (*rx_body)(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, uintmax_t len, bool more);
+    int (*rx_body)(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, uintmax_t len, bool more);
 
     // Invoked after a received HTTP message has been completely parsed.
     //
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 9d87f9d..1976f84 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -99,8 +99,7 @@ static int _client_rx_response_cb(h1_codec_request_state_t *lib_rs,
                                   uint32_t version_minor);
 static int _client_rx_header_cb(h1_codec_request_state_t *lib_rs, const char *key, const char *value);
 static int _client_rx_headers_done_cb(h1_codec_request_state_t *lib_rs, bool has_body);
-static int _client_rx_body_cb(h1_codec_request_state_t *lib_rs, qd_buffer_list_t *body, size_t offset, uintmax_t len,
-                              bool more);
+static int _client_rx_body_cb(h1_codec_request_state_t *lib_rs, qd_buffer_list_t *body, uintmax_t len, bool more);
 static void _client_rx_done_cb(h1_codec_request_state_t *lib_rs);
 static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled);
 static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context);
@@ -763,7 +762,7 @@ static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
 // Called with decoded body data.  This may be called multiple times as body
 // data becomes available.
 //
-static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
+static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len,
                               bool more)
 {
     _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
@@ -774,15 +773,6 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
            "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
-    if (offset) {
-        // dispatch assumes all body data starts at the buffer base so it cannot deal with offsets.
-        // Remove the offset by shifting the content of the head buffer forward
-        //
-        qd_buffer_t *head = DEQ_HEAD(*body);
-        memmove(qd_buffer_base(head), qd_buffer_base(head) + offset, qd_buffer_size(head) - offset);
-        head->size -= offset;
-    }
-
     //
     // Compose a DATA performative for this section of the stream
     //
diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c
index e1348d0..16162f5 100644
--- a/src/adaptors/http1/http1_codec.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -968,44 +968,51 @@ static bool parse_header(h1_codec_connection_t *conn, struct decoder_t *decoder)
 //
 static inline int consume_body_data(h1_codec_connection_t *conn, bool flush)
 {
-    struct decoder_t *decoder = &conn->decoder;
+    struct decoder_t       *decoder = &conn->decoder;
     qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
-    qd_iterator_pointer_t *rptr = &decoder->read_ptr;
-
-    // shortcut: if no more data to parse send the entire incoming chain
-    if (rptr->remaining == 0) {
+    qd_iterator_pointer_t     *rptr = &decoder->read_ptr;
+    qd_buffer_list_t          blist = DEQ_EMPTY;
+    size_t                   octets = 0;
 
-        decoder->hrs->in_octets += body_ptr->remaining;
-        decoder->error = conn->config.rx_body(decoder->hrs, &decoder->incoming,
-                                              body_ptr->cursor - qd_buffer_base(body_ptr->buffer),
-                                              body_ptr->remaining,
-                                              true);
-        DEQ_INIT(decoder->incoming);
-        *body_ptr = NULL_I_PTR;
-        *rptr = NULL_I_PTR;
-        return decoder->error;
-    }
+    // invariant:
+    assert(DEQ_HEAD(decoder->incoming) == body_ptr->buffer);
 
     // The read pointer points to somewhere in the buffer chain that contains some
     // unparsed data.  Send any buffers preceding the current read pointer.
-    qd_buffer_list_t blist = DEQ_EMPTY;
-    size_t octets = 0;
-    size_t body_offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer);
+    while (body_ptr->remaining) {
 
-    // invariant:
-    assert(DEQ_HEAD(decoder->incoming) == body_ptr->buffer);
+        if (body_ptr->buffer == rptr->buffer && rptr->remaining > 0)
+            break;
 
-    while (body_ptr->buffer && body_ptr->buffer != rptr->buffer) {
         DEQ_REMOVE_HEAD(decoder->incoming);
-        DEQ_INSERT_TAIL(blist, body_ptr->buffer);
-        octets += qd_buffer_cursor(body_ptr->buffer) - body_ptr->cursor;
+
+        size_t offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer);
+        if (offset) {
+            // most (all?) message buffer operations assume the message
+            // data starts at the buffer_base. Adjust accordingly
+            memmove(qd_buffer_base(body_ptr->buffer),
+                    qd_buffer_base(body_ptr->buffer) + offset,
+                    qd_buffer_size(body_ptr->buffer) - offset);
+            body_ptr->cursor = qd_buffer_base(body_ptr->buffer);
+            body_ptr->buffer->size -= offset;
+        }
+
+        if (qd_buffer_size(body_ptr->buffer) > 0) {
+            DEQ_INSERT_TAIL(blist, body_ptr->buffer);
+            octets += qd_buffer_size(body_ptr->buffer);
+            body_ptr->remaining -= qd_buffer_size(body_ptr->buffer);
+        } else {
+            qd_buffer_free(body_ptr->buffer);
+        }
         body_ptr->buffer = DEQ_HEAD(decoder->incoming);
-        body_ptr->cursor = qd_buffer_base(body_ptr->buffer);
+        body_ptr->cursor = body_ptr->buffer ? qd_buffer_base(body_ptr->buffer) : 0;
     }
 
     // invariant:
-    assert(octets <= body_ptr->remaining);
-    body_ptr->remaining -= octets;
+    assert(body_ptr->remaining >= 0);
+
+    // At this point if there is any body bytes remaining they are in the same
+    // buffer as the unparsed input (rptr).
 
     if (flush && body_ptr->remaining) {
         // need to copy out remaining body octets into new buffer
@@ -1016,16 +1023,13 @@ static inline int consume_body_data(h1_codec_connection_t *conn, bool flush)
         qd_buffer_insert(tail, body_ptr->remaining);
         DEQ_INSERT_TAIL(blist, tail);
         octets += body_ptr->remaining;
-        if (DEQ_SIZE(blist) == 1)
-            body_offset = 0;
-
         *body_ptr = *rptr;
         body_ptr->remaining = 0;
     }
 
     if (octets) {
         decoder->hrs->in_octets += octets;
-        decoder->error = conn->config.rx_body(decoder->hrs, &blist, body_offset, octets, true);
+        decoder->error = conn->config.rx_body(decoder->hrs, &blist, octets, true);
     }
     return decoder->error;
 }
@@ -1178,21 +1182,17 @@ static bool parse_body(h1_codec_connection_t *conn, struct decoder_t *decoder)
 
     // otherwise no explict body size, so just keep passing the entire unparsed
     // incoming chain along until the remote closes the connection
-    decoder->hrs->in_octets += decoder->read_ptr.remaining;
-    decoder->error = conn->config.rx_body(decoder->hrs,
-                                          &decoder->incoming,
-                                          decoder->read_ptr.cursor
-                                          - qd_buffer_base(decoder->read_ptr.buffer),
-                                          decoder->read_ptr.remaining,
-                                          true);
-    if (decoder->error) {
-        decoder->error_msg = "hrs_rx_body callback error";
-        return false;
+    if (decoder->read_ptr.remaining) {
+        // extend body_ptr to consume all unparsed read data
+        decoder->body_ptr.remaining += decoder->read_ptr.remaining;
+        decoder->read_ptr.remaining = 0;
+        decoder->read_ptr.buffer = DEQ_TAIL(decoder->incoming);
+        decoder->read_ptr.cursor = qd_buffer_cursor(decoder->read_ptr.buffer);
+        consume_body_data(conn, true);
+        decoder->body_ptr = decoder->read_ptr = NULL_I_PTR;
+        DEQ_INIT(decoder->incoming);
     }
 
-    decoder->body_ptr = decoder->read_ptr = NULL_I_PTR;
-    DEQ_INIT(decoder->incoming);
-
     return false;
 }
 
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index f678bde..b3a17f2 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -108,8 +108,7 @@ static int  _server_rx_response_cb(h1_codec_request_state_t *hrs,
                                    uint32_t version_minor);
 static int _server_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value);
 static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body);
-static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
-                              bool more);
+static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len, bool more);
 static void _server_rx_done_cb(h1_codec_request_state_t *hrs);
 static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool cancelled);
 static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context);
@@ -866,7 +865,7 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
 // Called with decoded body data.  This may be called multiple times as body
 // data becomes available.
 //
-static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t offset, size_t len,
+static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len,
                               bool more)
 {
     _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
@@ -879,15 +878,6 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
            "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
-    if (offset) {
-        // dispatch assumes all body data starts at the buffer base so it cannot deal with offsets.
-        // Remove the offset by shifting the content of the head buffer forward
-        //
-        qd_buffer_t *head = DEQ_HEAD(*body);
-        memmove(qd_buffer_base(head), qd_buffer_base(head) + offset, qd_buffer_size(head) - offset);
-        head->size -= offset;
-    }
-
     //
     // Compose a DATA performative for this section of the stream
     //


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org