You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/07 21:34:55 UTC

[qpid-dispatch] 01/02: DISPATCH-1742 - Completed implementation of outbound streaming path

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 5d021e224119f78ce043d7c1fc4d94b76f49e7c0
Author: Ted Ross <tr...@apache.org>
AuthorDate: Fri Aug 7 17:30:34 2020 -0400

    DISPATCH-1742 - Completed implementation of outbound streaming path
---
 src/adaptors/reference_adaptor.c |  89 ++++++++++++++++++++++-----------
 src/message.c                    | 103 ++++++++++++++++++++++++++++++++++-----
 src/message_private.h            |   1 +
 3 files changed, 154 insertions(+), 39 deletions(-)

diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index b6b0c1b..3c05bd8 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -247,39 +247,72 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
 
     switch (status) {
     case QD_MESSAGE_DEPTH_OK: {
+        //
+        // At least one complete body performative has arrived.  It is now safe to switch
+        // over to the per-message extraction of body-data segments.
+        //
         printf("qdr_ref_deliver: depth ok\n");
         qd_message_body_data_t        *body_data;
         qd_message_body_data_result_t  body_data_result;
-        body_data_result = qd_message_next_body_data(msg, &body_data);
-
-        switch (body_data_result) {
-        case QD_MESSAGE_BODY_DATA_OK: {
-            qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
-            char *body = (char*) qd_iterator_copy(body_iter);
-            printf("qdr_ref_deliver: message body-data received: %s\n", body);
-            free(body);
-            qd_iterator_free(body_iter);
-            break;
-        }
+
+        //
+        // Process as many body-data segments as are available.
+        //
+        while (true) {
+            body_data_result = qd_message_next_body_data(msg, &body_data);
+
+            switch (body_data_result) {
+            case QD_MESSAGE_BODY_DATA_OK: {
+                //
+                // We have a new valid body-data segment.  Handle it
+                //
+                printf("qdr_ref_deliver: body_data_buffer_count: %d\n", qd_message_body_data_buffer_count(body_data));
+
+                qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
+                char *body = (char*) qd_iterator_copy(body_iter);
+                printf("qdr_ref_deliver: message body-data received: %s\n", body);
+                free(body);
+                qd_iterator_free(body_iter);
+                qd_message_body_data_release(body_data);
+                break;
+            }
             
-        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
-            printf("qdr_ref_deliver: body-data incomplete\n");
-            break;
-
-        case QD_MESSAGE_BODY_DATA_NO_MORE:
-            qd_message_set_send_complete(msg);
-            qdr_link_flow(adaptor->core, link, 1, false);
-            return PN_ACCEPTED; // This will cause the delivery to be settled
+            case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+                //
+                // A new segment has not completely arrived yet.  Check again later.
+                //
+                printf("qdr_ref_deliver: body-data incomplete\n");
+                return 0;
+
+            case QD_MESSAGE_BODY_DATA_NO_MORE:
+                //
+                // We have already handled the last body-data segment for this delivery.
+                // Complete the "sending" of this delivery and replenish credit.
+                //
+                // Note that depending on the adaptor, it might be desirable to delay the
+                // acceptance and settlement of this delivery until a later event (i.e. when
+                // a requested action has completed).
+                //
+                qd_message_set_send_complete(msg);
+                qdr_link_flow(adaptor->core, link, 1, false);
+                return PN_ACCEPTED; // This will cause the delivery to be settled
             
-        case QD_MESSAGE_BODY_DATA_INVALID:
-            printf("qdr_ref_deliver: body-data invalid\n");
-            qdr_link_flow(adaptor->core, link, 1, false);
-            return PN_REJECTED;
-
-        case QD_MESSAGE_BODY_DATA_NOT_DATA:
-            printf("qdr_ref_deliver: body not data\n");
-            qdr_link_flow(adaptor->core, link, 1, false);
-            return PN_REJECTED;
+            case QD_MESSAGE_BODY_DATA_INVALID:
+                //
+                // The body-data is corrupt in some way.  Stop handling the delivery and reject it.
+                //
+                printf("qdr_ref_deliver: body-data invalid\n");
+                qdr_link_flow(adaptor->core, link, 1, false);
+                return PN_REJECTED;
+
+            case QD_MESSAGE_BODY_DATA_NOT_DATA:
+                //
+                // Valid data was seen, but it is not a body-data performative.  Reject the delivery.
+                //
+                printf("qdr_ref_deliver: body not data\n");
+                qdr_link_flow(adaptor->core, link, 1, false);
+                return PN_REJECTED;
+            }
         }
 
         break;
diff --git a/src/message.c b/src/message.c
index 24aa928..69312f3 100644
--- a/src/message.c
+++ b/src/message.c
@@ -658,7 +658,8 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
                                                  const unsigned char  *pattern,
                                                  int                   pattern_length,
                                                  const unsigned char  *expected_tags,
-                                                 qd_field_location_t  *location)
+                                                 qd_field_location_t  *location,
+                                                 bool                  dup_ok)
 {
     if (!*cursor || !can_advance(cursor, buffer))
         return QD_SECTION_NEED_MORE;
@@ -691,7 +692,7 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
     if (*expected_tags == 0)
         return QD_SECTION_INVALID;  // Error: Unexpected tag
 
-    if (location->parsed)
+    if (location->parsed && !dup_ok)
         return QD_SECTION_INVALID;  // Error: Duplicate section
 
     //
@@ -1916,9 +1917,9 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co
         return QD_MESSAGE_DEPTH_OK;
 
     qd_section_status_t rc;
-    rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location);
+    rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location, false);
     if (rc == QD_SECTION_NO_MATCH)  // try the alternative
-        rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern,  LONG,  expected_tags, location);
+        rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern,  LONG,  expected_tags, location, false);
 
     if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) {
         content->parse_depth = depth;
@@ -2302,8 +2303,49 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
  */
 static void find_last_buffer(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer)
 {
-    //qd_buffer_t *buf = location->buffer;
-    
+    qd_buffer_t *buf       = location->buffer;
+    size_t       remaining = location->hdr_length + location->length;
+
+    while (!!buf && remaining > 0) {
+        size_t this_buf_size = qd_buffer_size(buf) - (buf == location->buffer ? location->offset : 0);
+        if (remaining <= this_buf_size) {
+            *buffer = buf;
+            *cursor = qd_buffer_base(buf) + (buf == location->buffer ? location->offset : 0) + remaining;
+            return;
+        }
+        remaining -= this_buf_size;
+        buf = DEQ_NEXT(buf);
+    }
+
+    assert(false);  // The field should already have been validated as complete.
+}
+
+
+void trim_body_data_headers(qd_message_body_data_t *body_data)
+{
+    const qd_field_location_t *location = &body_data->section;
+    qd_buffer_t               *buffer   = location->buffer;
+    unsigned char             *cursor   = qd_buffer_base(buffer) + location->offset;
+
+    bool good = advance(&cursor, &buffer, location->hdr_length);
+    assert(good);
+    if (good) {
+        unsigned char tag = 0;
+        next_octet(&cursor, &buffer, &tag);
+        if (tag == QD_AMQP_VBIN8)
+            advance(&cursor, &buffer, 1);
+        else if (tag == QD_AMQP_VBIN32)
+            advance(&cursor, &buffer, 4);
+
+        can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary
+
+        body_data->payload.buffer     = buffer;
+        body_data->payload.offset     = cursor - qd_buffer_base(buffer);
+        body_data->payload.length     = location->length;
+        body_data->payload.hdr_length = 0;
+        body_data->payload.parsed     = true;
+        body_data->payload.tag        = tag;
+    }
 }
 
 
@@ -2317,18 +2359,27 @@ static void find_last_buffer(qd_field_location_t *location, unsigned char **curs
  */
 qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data)
 {
-    return 0;
+    const qd_field_location_t *location = &body_data->payload;
+
+    return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL);
 }
 
 
 /**
  * qd_message_body_data_buffer_count
  *
- * Return the number of buffers contained in the body_data object.
+ * Return the number of buffers contained in payload portion of the body_data object.
  */
 int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
 {
-    return 0;
+    int count = 1;
+    qd_buffer_t *buffer = body_data->payload.buffer;
+    while (!!buffer && buffer != body_data->last_buffer) {
+        buffer = DEQ_NEXT(buffer);
+        count++;
+    }
+
+    return count;
 }
 
 
@@ -2341,7 +2392,34 @@ int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
  */
 int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count)
 {
-    return 0;
+    int          actual_count = 0;
+    qd_buffer_t *buffer       = body_data->payload.buffer;
+
+    //
+    // Skip the offset
+    //
+    while (offset > 0 && !!buffer) {
+        buffer = DEQ_NEXT(buffer);
+        offset--;
+    }
+
+    //
+    // Fill the buffer array
+    //
+    int idx = 0;
+    while (idx < count && !!buffer) {
+        buffers[idx].context  = 0;
+        buffers[idx].bytes    = (char*) qd_buffer_base(buffer) + (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
+        buffers[idx].capacity = BUFFER_SIZE;
+        buffers[idx].size     = qd_buffer_size(buffer) - (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
+        buffers[idx].offset   = 0;
+
+        buffer = DEQ_NEXT(buffer);
+        actual_count++;
+        idx++;
+    }
+
+    return actual_count;
 }
 
 
@@ -2353,6 +2431,7 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe
  */
 void qd_message_body_data_release(qd_message_body_data_t *body_data)
 {
+    free_qd_message_body_data_t(body_data);
 }
 
 
@@ -2374,6 +2453,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
 
             find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
             body_data->last_buffer = msg->body_buffer;
+            trim_body_data_headers(body_data);
 
             assert(DEQ_SIZE(msg->body_data_list) == 0);
             DEQ_INSERT_TAIL(msg->body_data_list, body_data);
@@ -2390,7 +2470,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
 
     section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
                                            BODY_DATA_SHORT, 3, TAGS_BINARY,
-                                           &location);
+                                           &location, true);
 
     switch (section_status) {
     case QD_SECTION_INVALID:
@@ -2404,6 +2484,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
         body_data->section        = location;
         find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
         body_data->last_buffer = msg->body_buffer;
+        trim_body_data_headers(body_data);
         DEQ_INSERT_TAIL(msg->body_data_list, body_data);
         *out_body_data = body_data;
         return QD_MESSAGE_BODY_DATA_OK;
diff --git a/src/message_private.h b/src/message_private.h
index 6dc901a..20f4ce4 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -65,6 +65,7 @@ struct qd_message_body_data_t {
     DEQ_LINKS(qd_message_body_data_t);    // Linkage to form a DEQ
     qd_message_pvt_t    *owning_message;  // Pointer to the owning message
     qd_field_location_t  section;         // Section descriptor for the field
+    qd_field_location_t  payload;         // Descriptor for the payload of the body data
     qd_buffer_t         *last_buffer;     // Pointer to the last buffer in the field
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org