You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/07 21:34:54 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated (36da843 -> 492ae4b)

This is an automated email from the ASF dual-hosted git repository.

tross pushed a change to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from 36da843  DISPATCH-1743 - Introduce a HTTP/2 Adapter. This adaptor will act as the HTTP/2 <-->AMQP bridge
     new 5d021e2  DISPATCH-1742 - Completed implementation of outbound streaming path
     new 492ae4b  DISPATCH-1742 - Fixed compilation error

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/adaptors/http_adaptor.c      |   4 +-
 src/adaptors/reference_adaptor.c |  89 ++++++++++++++++++++++-----------
 src/message.c                    | 103 ++++++++++++++++++++++++++++++++++-----
 src/message_private.h            |   1 +
 4 files changed, 157 insertions(+), 40 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/02: DISPATCH-1742 - Fixed compilation error

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 492ae4bbeda155e02bf91a42fd41233a60c17f10
Author: Ted Ross <tr...@apache.org>
AuthorDate: Fri Aug 7 17:33:56 2020 -0400

    DISPATCH-1742 - Fixed compilation error
---
 src/adaptors/http_adaptor.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 3c138ca..86708c8 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -419,7 +419,9 @@ static void write_buffers(qdr_http_connection_t *conn)
     qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Writing %i bytes in write_buffers", conn->conn_id, total_bytes);
     if (i >0) {
         size_t num_buffers_written = pn_raw_connection_write_buffers(session_data->conn->pn_raw_conn, raw_buffers, num_buffs);
-        assert(num_buffs == num_buffers_written);
+        if (num_buffs != num_buffers_written) {
+            assert(false);
+        }
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/02: DISPATCH-1742 - Completed implementation of outbound streaming path

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 5d021e224119f78ce043d7c1fc4d94b76f49e7c0
Author: Ted Ross <tr...@apache.org>
AuthorDate: Fri Aug 7 17:30:34 2020 -0400

    DISPATCH-1742 - Completed implementation of outbound streaming path
---
 src/adaptors/reference_adaptor.c |  89 ++++++++++++++++++++++-----------
 src/message.c                    | 103 ++++++++++++++++++++++++++++++++++-----
 src/message_private.h            |   1 +
 3 files changed, 154 insertions(+), 39 deletions(-)

diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index b6b0c1b..3c05bd8 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -247,39 +247,72 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
 
     switch (status) {
     case QD_MESSAGE_DEPTH_OK: {
+        //
+        // At least one complete body performative has arrived.  It is now safe to switch
+        // over to the per-message extraction of body-data segments.
+        //
         printf("qdr_ref_deliver: depth ok\n");
         qd_message_body_data_t        *body_data;
         qd_message_body_data_result_t  body_data_result;
-        body_data_result = qd_message_next_body_data(msg, &body_data);
-
-        switch (body_data_result) {
-        case QD_MESSAGE_BODY_DATA_OK: {
-            qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
-            char *body = (char*) qd_iterator_copy(body_iter);
-            printf("qdr_ref_deliver: message body-data received: %s\n", body);
-            free(body);
-            qd_iterator_free(body_iter);
-            break;
-        }
+
+        //
+        // Process as many body-data segments as are available.
+        //
+        while (true) {
+            body_data_result = qd_message_next_body_data(msg, &body_data);
+
+            switch (body_data_result) {
+            case QD_MESSAGE_BODY_DATA_OK: {
+                //
+                // We have a new valid body-data segment.  Handle it
+                //
+                printf("qdr_ref_deliver: body_data_buffer_count: %d\n", qd_message_body_data_buffer_count(body_data));
+
+                qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
+                char *body = (char*) qd_iterator_copy(body_iter);
+                printf("qdr_ref_deliver: message body-data received: %s\n", body);
+                free(body);
+                qd_iterator_free(body_iter);
+                qd_message_body_data_release(body_data);
+                break;
+            }
             
-        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
-            printf("qdr_ref_deliver: body-data incomplete\n");
-            break;
-
-        case QD_MESSAGE_BODY_DATA_NO_MORE:
-            qd_message_set_send_complete(msg);
-            qdr_link_flow(adaptor->core, link, 1, false);
-            return PN_ACCEPTED; // This will cause the delivery to be settled
+            case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+                //
+                // A new segment has not completely arrived yet.  Check again later.
+                //
+                printf("qdr_ref_deliver: body-data incomplete\n");
+                return 0;
+
+            case QD_MESSAGE_BODY_DATA_NO_MORE:
+                //
+                // We have already handled the last body-data segment for this delivery.
+                // Complete the "sending" of this delivery and replenish credit.
+                //
+                // Note that depending on the adaptor, it might be desirable to delay the
+                // acceptance and settlement of this delivery until a later event (i.e. when
+                // a requested action has completed).
+                //
+                qd_message_set_send_complete(msg);
+                qdr_link_flow(adaptor->core, link, 1, false);
+                return PN_ACCEPTED; // This will cause the delivery to be settled
             
-        case QD_MESSAGE_BODY_DATA_INVALID:
-            printf("qdr_ref_deliver: body-data invalid\n");
-            qdr_link_flow(adaptor->core, link, 1, false);
-            return PN_REJECTED;
-
-        case QD_MESSAGE_BODY_DATA_NOT_DATA:
-            printf("qdr_ref_deliver: body not data\n");
-            qdr_link_flow(adaptor->core, link, 1, false);
-            return PN_REJECTED;
+            case QD_MESSAGE_BODY_DATA_INVALID:
+                //
+                // The body-data is corrupt in some way.  Stop handling the delivery and reject it.
+                //
+                printf("qdr_ref_deliver: body-data invalid\n");
+                qdr_link_flow(adaptor->core, link, 1, false);
+                return PN_REJECTED;
+
+            case QD_MESSAGE_BODY_DATA_NOT_DATA:
+                //
+                // Valid data was seen, but it is not a body-data performative.  Reject the delivery.
+                //
+                printf("qdr_ref_deliver: body not data\n");
+                qdr_link_flow(adaptor->core, link, 1, false);
+                return PN_REJECTED;
+            }
         }
 
         break;
diff --git a/src/message.c b/src/message.c
index 24aa928..69312f3 100644
--- a/src/message.c
+++ b/src/message.c
@@ -658,7 +658,8 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
                                                  const unsigned char  *pattern,
                                                  int                   pattern_length,
                                                  const unsigned char  *expected_tags,
-                                                 qd_field_location_t  *location)
+                                                 qd_field_location_t  *location,
+                                                 bool                  dup_ok)
 {
     if (!*cursor || !can_advance(cursor, buffer))
         return QD_SECTION_NEED_MORE;
@@ -691,7 +692,7 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
     if (*expected_tags == 0)
         return QD_SECTION_INVALID;  // Error: Unexpected tag
 
-    if (location->parsed)
+    if (location->parsed && !dup_ok)
         return QD_SECTION_INVALID;  // Error: Duplicate section
 
     //
@@ -1916,9 +1917,9 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co
         return QD_MESSAGE_DEPTH_OK;
 
     qd_section_status_t rc;
-    rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location);
+    rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location, false);
     if (rc == QD_SECTION_NO_MATCH)  // try the alternative
-        rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern,  LONG,  expected_tags, location);
+        rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern,  LONG,  expected_tags, location, false);
 
     if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) {
         content->parse_depth = depth;
@@ -2302,8 +2303,49 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
  */
 static void find_last_buffer(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer)
 {
-    //qd_buffer_t *buf = location->buffer;
-    
+    qd_buffer_t *buf       = location->buffer;
+    size_t       remaining = location->hdr_length + location->length;
+
+    while (!!buf && remaining > 0) {
+        size_t this_buf_size = qd_buffer_size(buf) - (buf == location->buffer ? location->offset : 0);
+        if (remaining <= this_buf_size) {
+            *buffer = buf;
+            *cursor = qd_buffer_base(buf) + (buf == location->buffer ? location->offset : 0) + remaining;
+            return;
+        }
+        remaining -= this_buf_size;
+        buf = DEQ_NEXT(buf);
+    }
+
+    assert(false);  // The field should already have been validated as complete.
+}
+
+
+void trim_body_data_headers(qd_message_body_data_t *body_data)
+{
+    const qd_field_location_t *location = &body_data->section;
+    qd_buffer_t               *buffer   = location->buffer;
+    unsigned char             *cursor   = qd_buffer_base(buffer) + location->offset;
+
+    bool good = advance(&cursor, &buffer, location->hdr_length);
+    assert(good);
+    if (good) {
+        unsigned char tag = 0;
+        next_octet(&cursor, &buffer, &tag);
+        if (tag == QD_AMQP_VBIN8)
+            advance(&cursor, &buffer, 1);
+        else if (tag == QD_AMQP_VBIN32)
+            advance(&cursor, &buffer, 4);
+
+        can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary
+
+        body_data->payload.buffer     = buffer;
+        body_data->payload.offset     = cursor - qd_buffer_base(buffer);
+        body_data->payload.length     = location->length;
+        body_data->payload.hdr_length = 0;
+        body_data->payload.parsed     = true;
+        body_data->payload.tag        = tag;
+    }
 }
 
 
@@ -2317,18 +2359,27 @@ static void find_last_buffer(qd_field_location_t *location, unsigned char **curs
  */
 qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data)
 {
-    return 0;
+    const qd_field_location_t *location = &body_data->payload;
+
+    return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL);
 }
 
 
 /**
  * qd_message_body_data_buffer_count
  *
- * Return the number of buffers contained in the body_data object.
+ * Return the number of buffers contained in payload portion of the body_data object.
  */
 int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
 {
-    return 0;
+    int count = 1;
+    qd_buffer_t *buffer = body_data->payload.buffer;
+    while (!!buffer && buffer != body_data->last_buffer) {
+        buffer = DEQ_NEXT(buffer);
+        count++;
+    }
+
+    return count;
 }
 
 
@@ -2341,7 +2392,34 @@ int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
  */
 int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count)
 {
-    return 0;
+    int          actual_count = 0;
+    qd_buffer_t *buffer       = body_data->payload.buffer;
+
+    //
+    // Skip the offset
+    //
+    while (offset > 0 && !!buffer) {
+        buffer = DEQ_NEXT(buffer);
+        offset--;
+    }
+
+    //
+    // Fill the buffer array
+    //
+    int idx = 0;
+    while (idx < count && !!buffer) {
+        buffers[idx].context  = 0;
+        buffers[idx].bytes    = (char*) qd_buffer_base(buffer) + (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
+        buffers[idx].capacity = BUFFER_SIZE;
+        buffers[idx].size     = qd_buffer_size(buffer) - (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
+        buffers[idx].offset   = 0;
+
+        buffer = DEQ_NEXT(buffer);
+        actual_count++;
+        idx++;
+    }
+
+    return actual_count;
 }
 
 
@@ -2353,6 +2431,7 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe
  */
 void qd_message_body_data_release(qd_message_body_data_t *body_data)
 {
+    free_qd_message_body_data_t(body_data);
 }
 
 
@@ -2374,6 +2453,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
 
             find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
             body_data->last_buffer = msg->body_buffer;
+            trim_body_data_headers(body_data);
 
             assert(DEQ_SIZE(msg->body_data_list) == 0);
             DEQ_INSERT_TAIL(msg->body_data_list, body_data);
@@ -2390,7 +2470,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
 
     section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
                                            BODY_DATA_SHORT, 3, TAGS_BINARY,
-                                           &location);
+                                           &location, true);
 
     switch (section_status) {
     case QD_SECTION_INVALID:
@@ -2404,6 +2484,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
         body_data->section        = location;
         find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
         body_data->last_buffer = msg->body_buffer;
+        trim_body_data_headers(body_data);
         DEQ_INSERT_TAIL(msg->body_data_list, body_data);
         *out_body_data = body_data;
         return QD_MESSAGE_BODY_DATA_OK;
diff --git a/src/message_private.h b/src/message_private.h
index 6dc901a..20f4ce4 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -65,6 +65,7 @@ struct qd_message_body_data_t {
     DEQ_LINKS(qd_message_body_data_t);    // Linkage to form a DEQ
     qd_message_pvt_t    *owning_message;  // Pointer to the owning message
     qd_field_location_t  section;         // Section descriptor for the field
+    qd_field_location_t  payload;         // Descriptor for the payload of the body data
     qd_buffer_t         *last_buffer;     // Pointer to the last buffer in the field
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org