You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:17 UTC

[qpid-dispatch] 31/32: Dataplane - Added the body_data data structure for reading streaming messages. WIP - The following functions (in message.c) need to be implemented: find_last_buffer qd_message_body_data_iterator qd_message_body_data_buffer_count qd_message_body_data_buffers qd_message_body_data_release

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit ee033ef6ef931e7fb9c3757bc220bc4ecc42632f
Author: Ted Ross <tr...@apache.org>
AuthorDate: Sun Jul 26 21:02:36 2020 -0400

    Dataplane - Added the body_data data structure for reading streaming messages. WIP - The following functions (in message.c) need to be implemented: find_last_buffer qd_message_body_data_iterator qd_message_body_data_buffer_count qd_message_body_data_buffers qd_message_body_data_release
---
 include/qpid/dispatch/message.h  |  80 ++++++++++++++++++-----
 src/adaptors/reference_adaptor.c |  33 ++++++++--
 src/message.c                    | 135 +++++++++++++++++++++++++++++++++++++++
 src/message_private.h            |  46 ++++++++-----
 4 files changed, 258 insertions(+), 36 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index f9e61d1..f409302 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -59,7 +59,8 @@
 
 // Callback for status change (confirmed persistent, loaded-in-memory, etc.)
 
-typedef struct qd_message_t qd_message_t;
+typedef struct qd_message_t           qd_message_t;
+typedef struct qd_message_body_data_t qd_message_body_data_t;
 
 /** Amount of message to be parsed.  */
 typedef enum {
@@ -292,31 +293,78 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field);
 
 
 /**
- * qd_message_read_body
+ * qd_message_body_data_iterator
  *
- * Populate Proton raw buffers from the body section in a streaming fashion (i.e. repeated 
- * invocations yield new seqments of the content stream).  The buffers will be left in place
- * in the message until they are explicitly released.
+ * Return an iterator that references the content (not the performative headers)
+ * of the entire body-data section.
  *
- * @param msg Pointer to a message
- * @param buffers An array of raw-buffer descriptors to be written
- * @param buffer_count The number of descriptors supplied in buffers
- * @return The number of raw buffers written.
+ * The returned iterator must eventually be freed by the caller.
+ *
+ * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @return Pointer to an iterator referencing the body_data content
+ */
+qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data);
+
+
+/**
+ * qd_message_body_data_buffer_count
+ *
+ * Return the number of buffers that are needed to hold this body-data's content.
+ *
+ * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @return Number of pn_raw_buffers needed to contain the entire content of this body_data.
+ */
+int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data);
+
+
+/**
+ * qd_message_body_data_buffers
+ *
+ * Populate an array of pn_raw_buffer_t objects with references to the body_data's content.
+ *
+ * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @param buffers Pointer to an array of pn_raw_buffer_t objects
+ * @param offset The offset (in the body_data's buffer set) from which copying should begin
+ * @param count The number of pn_raw_buffer_t objects in the buffers array
+ * @return The number of pn_raw_buffer_t objects that were overwritten
+ */
+int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count);
+
+
+/**
+ * qd_message_body_data_release
+ *
+ * Release buffers that were associated with a body-data section.  It is not required that body-data
+ * objects be released in the same order in which they were offered.
+ *
+ * Once this function is called, the caller must drop its reference to the body_data object
+ * and not use it again.
+ *
+ * @param body_data Pointer to a body data object returned by qd_message_next_body_data
  */
-int qd_message_read_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int buffer_count);
+void qd_message_body_data_release(qd_message_body_data_t *body_data);
+
+
+typedef enum {
+    QD_MESSAGE_BODY_DATA_OK,           // A valid body data object have been returned
+    QD_MESSAGE_BODY_DATA_INCOMPLETE,   // The next body data is incomplete, try again later
+    QD_MESSAGE_BODY_DATA_NO_MORE,      // There are no more body data objects in this stream
+    QD_MESSAGE_BODY_DATA_INVALID,      // The next body data is invalid, the stream is corrupted
+    QD_MESSAGE_BODY_DATA_NOT_DATA      // The body of the message is not a DATA segment
+} qd_message_body_data_result_t;
 
 
 /**
- * qd_message_release_body
+ * qd_message_next_body_data
  *
- * Release buffers that were aliased by Proton raw buffers.  The buffers in the message that
- * have been fully read will have their reference counts decreased so they may be freed
+ * Get the next body-data section from this streaming message return the result and
+ * possibly the valid, completed body_data object.
  *
  * @param msg Pointer to a message
- * @param buffers An array of raw-buffer descriptors previously returned by qd_message_read_body
- * @param buffer_count The number of descriptors in the array that contained data
+ * @param body_data Output pointer to a body_data object (or 0 if not OK)
+ * @return The body_data_result describing the result of this operation
  */
-void qd_message_release_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int buffer_count);
+qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_message_body_data_t **body_data);
 
 
 /** Put string representation of a message suitable for logging in buffer.
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index a0032a1..2a3a975 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -248,22 +248,47 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
     switch (status) {
     case QD_MESSAGE_DEPTH_OK: {
         printf("qdr_ref_deliver: depth ok\n");
-        if (qd_message_receive_complete(msg)) {
-            qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
+        qd_message_body_data_t        *body_data;
+        qd_message_body_data_result_t  body_data_result;
+        body_data_result = qd_message_next_body_data(msg, &body_data);
+
+        switch (body_data_result) {
+        case QD_MESSAGE_BODY_DATA_OK: {
+            qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
             char *body = (char*) qd_iterator_copy(body_iter);
-            printf("qdr_ref_deliver: complete message received, body=%s\n", body);
+            printf("qdr_ref_deliver: message body-data received: %s\n", body);
             free(body);
             qd_iterator_free(body_iter);
+            break;
+        }
+            
+        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+            printf("qdr_ref_deliver: body-data incomplete\n");
+            break;
+
+        case QD_MESSAGE_BODY_DATA_NO_MORE:
             qd_message_set_send_complete(msg);
             qdr_link_flow(adaptor->core, link, 1, false);
             return PN_ACCEPTED; // This will cause the delivery to be settled
+            
+        case QD_MESSAGE_BODY_DATA_INVALID:
+            printf("qdr_ref_deliver: body-data invalid\n");
+            qdr_link_flow(adaptor->core, link, 1, false);
+            return PN_REJECTED;
+
+        case QD_MESSAGE_BODY_DATA_NOT_DATA:
+            printf("qdr_ref_deliver: body not data\n");
+            qdr_link_flow(adaptor->core, link, 1, false);
+            return PN_REJECTED;
         }
+
         break;
     }
 
     case QD_MESSAGE_DEPTH_INVALID:
         printf("qdr_ref_deliver: message invalid\n");
         qdr_link_flow(adaptor->core, link, 1, false);
+        return PN_REJECTED;
         break;
 
     case QD_MESSAGE_DEPTH_INCOMPLETE:
@@ -489,4 +514,4 @@ void qdr_ref_adaptor_final(void *adaptor_context)
 /**
  * Declare the adaptor so that it will self-register on process startup.
  */
-QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
+//QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
diff --git a/src/message.c b/src/message.c
index 2b758b0..364ec21 100644
--- a/src/message.c
+++ b/src/message.c
@@ -86,6 +86,7 @@ PN_HANDLE(PN_DELIVERY_CTX)
 
 ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0);
 ALLOC_DEFINE(qd_message_content_t);
+ALLOC_DEFINE(qd_message_body_data_t);
 
 typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
 
@@ -1092,6 +1093,8 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     if (!copy)
         return 0;
 
+    ZERO(copy);
+
     qd_buffer_list_clone(&copy->ma_to_override, &msg->ma_to_override);
     qd_buffer_list_clone(&copy->ma_trace, &msg->ma_trace);
     qd_buffer_list_clone(&copy->ma_ingress, &msg->ma_ingress);
@@ -2284,6 +2287,138 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
 }
 
 
+/**
+ * find_last_buffer
+ *
+ * Given a field location, find the following:
+ *
+ *  - *cursor - The pointer to the octet _past_ the last octet in the field.  If this is the last octet in
+ *              the buffer, the cursor must point one octet past the buffer.
+ *  - *buffer - The last buffer that contains content for this field.
+ *
+ * Important:  If the last octet of the field is the last octet of a buffer and there are more buffers in the
+ * buffer list, *buffer _must_ refer to the buffer that contains the last octet of the field and *cursor must
+ * point at the octet following that octet, even if it points past the end of the buffer.
+ */
+static void find_last_buffer(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer)
+{
+    //qd_buffer_t *buf = location->buffer;
+    
+}
+
+
+/**
+ * qd_message_body_data_iterator
+ *
+ * Given a body_data object, return an iterator that refers to the content of that body data.  This iterator
+ * shall not refer to the 3-byte performative header or the header for the vbin{8,32} field.
+ *
+ * The iterator must be freed eventually by the caller.
+ */
+qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data)
+{
+    return 0;
+}
+
+
+/**
+ * qd_message_body_data_buffer_count
+ *
+ * Return the number of buffers contained in the body_data object.
+ */
+int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
+{
+    return 0;
+}
+
+
+/**
+ * qd_message_body_data_buffers
+ *
+ * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the body_data
+ * object.  Don't fill more than count raw_buffers with data.  Start at offset from the zero-th buffer in the
+ * body_data.
+ */
+int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count)
+{
+    return 0;
+}
+
+
+/**
+ * qd_message_body_data_release
+ *
+ * Decrement the fanout ref-counts for all of the buffers referred to in the body_data.  If any have reached zero,
+ * remove them from the buffer list and free them.  Never dec-ref the last buffer in the content's buffer list.
+ */
+void qd_message_body_data_release(qd_message_body_data_t *body_data)
+{
+}
+
+
+qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd_message_body_data_t **out_body_data)
+{
+    qd_message_pvt_t       *msg       = (qd_message_pvt_t*) in_msg;
+    qd_message_body_data_t *body_data = 0;
+
+    if (!msg->body_cursor) {
+        //
+        // We haven't returned a body-data record for this message yet.
+        //
+        qd_message_depth_status_t status = qd_message_check_depth(in_msg, QD_DEPTH_BODY);
+        if (status == QD_MESSAGE_DEPTH_OK) {
+            body_data = new_qd_message_body_data_t();
+            ZERO(body_data);
+            body_data->owning_message = msg;
+            body_data->section        = msg->content->section_body;
+
+            find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
+            body_data->last_buffer = msg->body_buffer;
+
+            assert(DEQ_SIZE(msg->body_data_list) == 0);
+            DEQ_INSERT_TAIL(msg->body_data_list, body_data);
+            *out_body_data = body_data;
+            return QD_MESSAGE_BODY_DATA_OK;
+        } else if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
+            return QD_MESSAGE_BODY_DATA_INCOMPLETE;
+        else if (status == QD_MESSAGE_DEPTH_INVALID)
+            return QD_MESSAGE_BODY_DATA_INVALID;
+    }
+
+    qd_section_status_t section_status;
+    qd_field_location_t location;
+
+    section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
+                                           BODY_DATA_SHORT, 3, TAGS_BINARY,
+                                           &location);
+
+    switch (section_status) {
+    case QD_SECTION_INVALID:
+    case QD_SECTION_NO_MATCH:
+        return QD_MESSAGE_BODY_DATA_INVALID;
+
+    case QD_SECTION_MATCH:
+        body_data = new_qd_message_body_data_t();
+        ZERO(body_data);
+        body_data->owning_message = msg;
+        body_data->section        = location;
+        find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
+        body_data->last_buffer = msg->body_buffer;
+        DEQ_INSERT_TAIL(msg->body_data_list, body_data);
+        *out_body_data = body_data;
+        return QD_MESSAGE_BODY_DATA_OK;
+        
+    case QD_SECTION_NEED_MORE:
+        if (msg->content->receive_complete)
+            return QD_MESSAGE_BODY_DATA_NO_MORE;
+        else
+            return QD_MESSAGE_BODY_DATA_INCOMPLETE;
+    }
+    
+    return QD_MESSAGE_BODY_DATA_NO_MORE;
+}
+
+
 int qd_message_read_body(qd_message_t *in_msg, pn_raw_buffer_t* buffers, int length)
 {
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
diff --git a/src/message_private.h b/src/message_private.h
index d2b62f7..6dc901a 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -24,6 +24,8 @@
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/atomic.h>
 
+typedef struct qd_message_pvt_t qd_message_pvt_t;
+
 /** @file
  * Message representation.
  * 
@@ -59,6 +61,16 @@ typedef struct {
 } qd_field_location_t;
 
 
+struct qd_message_body_data_t {
+    DEQ_LINKS(qd_message_body_data_t);    // Linkage to form a DEQ
+    qd_message_pvt_t    *owning_message;  // Pointer to the owning message
+    qd_field_location_t  section;         // Section descriptor for the field
+    qd_buffer_t         *last_buffer;     // Pointer to the last buffer in the field
+};
+
+ALLOC_DECLARE(qd_message_body_data_t);
+DEQ_DECLARE(qd_message_body_data_t, qd_message_body_data_list_t);
+
 // TODO - consider using pointers to qd_field_location_t below to save memory
 // TODO - provide a way to allocate a message without a lock for the link-routing case.
 //        It's likely that link-routing will cause no contention for the message content.
@@ -93,7 +105,6 @@ typedef struct {
     qd_field_location_t  field_group_id;
     qd_field_location_t  field_group_sequence;
     qd_field_location_t  field_reply_to_group_id;
-    qd_field_location_t  body;                            // The body of the message
 
     qd_buffer_t         *parse_buffer;                    // Pointer to the buffer where parsing should resume, if needed
     unsigned char       *parse_cursor;                    // Pointer to octet in parse_buffer where parsing should resume, if needed
@@ -126,21 +137,24 @@ typedef struct {
     uint8_t              priority;                       // The priority of this message
 } qd_message_content_t;
 
-typedef struct {
-    qd_iterator_pointer_t cursor;          // A pointer to the current location of the outgoing byte stream.
-    qd_message_depth_t    message_depth;   // What is the depth of the message that has been received so far
-    qd_message_depth_t    sent_depth;      // How much of the message has been sent?  QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on.
-    qd_message_content_t *content;         // The actual content of the message. The content is never copied
-    qd_buffer_list_t      ma_to_override;  // to field in outgoing message annotations.
-    qd_buffer_list_t      ma_trace;        // trace list in outgoing message annotations
-    qd_buffer_list_t      ma_ingress;      // ingress field in outgoing message annotations
-    int                   ma_phase;        // phase for the override address
-    qd_field_location_t   body_section;    // Location of the current parsed body section
-    bool                  strip_annotations_in;
-    bool                  send_complete;   // Has the message been completely received and completely sent?
-    bool                  tag_sent;        // Tags are sent
-    bool                  is_fanout;       // If msg is an outgoing fanout
-} qd_message_pvt_t;
+struct qd_message_pvt_t {
+    qd_iterator_pointer_t        cursor;          // A pointer to the current location of the outgoing byte stream.
+    qd_message_depth_t           message_depth;   // What is the depth of the message that has been received so far
+    qd_message_depth_t           sent_depth;      // How much of the message has been sent?  QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on.
+    qd_message_content_t        *content;         // The actual content of the message. The content is never copied
+    qd_buffer_list_t             ma_to_override;  // to field in outgoing message annotations.
+    qd_buffer_list_t             ma_trace;        // trace list in outgoing message annotations
+    qd_buffer_list_t             ma_ingress;      // ingress field in outgoing message annotations
+    int                          ma_phase;        // phase for the override address
+    qd_message_body_data_list_t  body_data_list;  // TODO - move this to the content for one-time parsing (TLR)
+    qd_message_body_data_t      *next_body_data;
+    unsigned char               *body_cursor;
+    qd_buffer_t                 *body_buffer;
+    bool                         strip_annotations_in;
+    bool                         send_complete;   // Has the message been completely received and completely sent?
+    bool                         tag_sent;        // Tags are sent
+    bool                         is_fanout;       // If msg is an outgoing fanout
+};
 
 ALLOC_DECLARE(qd_message_t);
 ALLOC_DECLARE(qd_message_content_t);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org