You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:17 UTC
[qpid-dispatch] 31/32: Dataplane - Added the body_data data
structure for reading streaming messages. WIP - The following functions (in
message.c) need to be implemented: find_last_buffer
qd_message_body_data_iterator qd_message_body_data_buffer_count
qd_message_body_data_buffers qd_message_body_data_release
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit ee033ef6ef931e7fb9c3757bc220bc4ecc42632f
Author: Ted Ross <tr...@apache.org>
AuthorDate: Sun Jul 26 21:02:36 2020 -0400
Dataplane - Added the body_data data structure for reading streaming messages. WIP - The following functions (in message.c) need to be implemented: find_last_buffer qd_message_body_data_iterator qd_message_body_data_buffer_count qd_message_body_data_buffers qd_message_body_data_release
---
include/qpid/dispatch/message.h | 80 ++++++++++++++++++-----
src/adaptors/reference_adaptor.c | 33 ++++++++--
src/message.c | 135 +++++++++++++++++++++++++++++++++++++++
src/message_private.h | 46 ++++++++-----
4 files changed, 258 insertions(+), 36 deletions(-)
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index f9e61d1..f409302 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -59,7 +59,8 @@
// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
-typedef struct qd_message_t qd_message_t;
+typedef struct qd_message_t qd_message_t;
+typedef struct qd_message_body_data_t qd_message_body_data_t;
/** Amount of message to be parsed. */
typedef enum {
@@ -292,31 +293,78 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field);
/**
- * qd_message_read_body
+ * qd_message_body_data_iterator
*
- * Populate Proton raw buffers from the body section in a streaming fashion (i.e. repeated
- * invocations yield new seqments of the content stream). The buffers will be left in place
- * in the message until they are explicitly released.
+ * Return an iterator that references the content (not the performative headers)
+ * of the entire body-data section.
*
- * @param msg Pointer to a message
- * @param buffers An array of raw-buffer descriptors to be written
- * @param buffer_count The number of descriptors supplied in buffers
- * @return The number of raw buffers written.
+ * The returned iterator must eventually be freed by the caller.
+ *
+ * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @return Pointer to an iterator referencing the body_data content
+ */
+qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data);
+
+
+/**
+ * qd_message_body_data_buffer_count
+ *
+ * Return the number of buffers that are needed to hold this body-data's content.
+ *
+ * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @return Number of pn_raw_buffers needed to contain the entire content of this body_data.
+ */
+int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data);
+
+
+/**
+ * qd_message_body_data_buffers
+ *
+ * Populate an array of pn_raw_buffer_t objects with references to the body_data's content.
+ *
+ * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @param buffers Pointer to an array of pn_raw_buffer_t objects
+ * @param offset The offset (in the body_data's buffer set) from which copying should begin
+ * @param count The number of pn_raw_buffer_t objects in the buffers array
+ * @return The number of pn_raw_buffer_t objects that were overwritten
+ */
+int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count);
+
+
+/**
+ * qd_message_body_data_release
+ *
+ * Release buffers that were associated with a body-data section. It is not required that body-data
+ * objects be released in the same order in which they were offered.
+ *
+ * Once this function is called, the caller must drop its reference to the body_data object
+ * and not use it again.
+ *
+ * @param body_data Pointer to a body data object returned by qd_message_next_body_data
*/
-int qd_message_read_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int buffer_count);
+void qd_message_body_data_release(qd_message_body_data_t *body_data);
+
+
+typedef enum {
+ QD_MESSAGE_BODY_DATA_OK, // A valid body data object have been returned
+ QD_MESSAGE_BODY_DATA_INCOMPLETE, // The next body data is incomplete, try again later
+ QD_MESSAGE_BODY_DATA_NO_MORE, // There are no more body data objects in this stream
+ QD_MESSAGE_BODY_DATA_INVALID, // The next body data is invalid, the stream is corrupted
+ QD_MESSAGE_BODY_DATA_NOT_DATA // The body of the message is not a DATA segment
+} qd_message_body_data_result_t;
/**
- * qd_message_release_body
+ * qd_message_next_body_data
*
- * Release buffers that were aliased by Proton raw buffers. The buffers in the message that
- * have been fully read will have their reference counts decreased so they may be freed
+ * Get the next body-data section from this streaming message return the result and
+ * possibly the valid, completed body_data object.
*
* @param msg Pointer to a message
- * @param buffers An array of raw-buffer descriptors previously returned by qd_message_read_body
- * @param buffer_count The number of descriptors in the array that contained data
+ * @param body_data Output pointer to a body_data object (or 0 if not OK)
+ * @return The body_data_result describing the result of this operation
*/
-void qd_message_release_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int buffer_count);
+qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_message_body_data_t **body_data);
/** Put string representation of a message suitable for logging in buffer.
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index a0032a1..2a3a975 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -248,22 +248,47 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
switch (status) {
case QD_MESSAGE_DEPTH_OK: {
printf("qdr_ref_deliver: depth ok\n");
- if (qd_message_receive_complete(msg)) {
- qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
+ qd_message_body_data_t *body_data;
+ qd_message_body_data_result_t body_data_result;
+ body_data_result = qd_message_next_body_data(msg, &body_data);
+
+ switch (body_data_result) {
+ case QD_MESSAGE_BODY_DATA_OK: {
+ qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
char *body = (char*) qd_iterator_copy(body_iter);
- printf("qdr_ref_deliver: complete message received, body=%s\n", body);
+ printf("qdr_ref_deliver: message body-data received: %s\n", body);
free(body);
qd_iterator_free(body_iter);
+ break;
+ }
+
+ case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ printf("qdr_ref_deliver: body-data incomplete\n");
+ break;
+
+ case QD_MESSAGE_BODY_DATA_NO_MORE:
qd_message_set_send_complete(msg);
qdr_link_flow(adaptor->core, link, 1, false);
return PN_ACCEPTED; // This will cause the delivery to be settled
+
+ case QD_MESSAGE_BODY_DATA_INVALID:
+ printf("qdr_ref_deliver: body-data invalid\n");
+ qdr_link_flow(adaptor->core, link, 1, false);
+ return PN_REJECTED;
+
+ case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ printf("qdr_ref_deliver: body not data\n");
+ qdr_link_flow(adaptor->core, link, 1, false);
+ return PN_REJECTED;
}
+
break;
}
case QD_MESSAGE_DEPTH_INVALID:
printf("qdr_ref_deliver: message invalid\n");
qdr_link_flow(adaptor->core, link, 1, false);
+ return PN_REJECTED;
break;
case QD_MESSAGE_DEPTH_INCOMPLETE:
@@ -489,4 +514,4 @@ void qdr_ref_adaptor_final(void *adaptor_context)
/**
* Declare the adaptor so that it will self-register on process startup.
*/
-QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
+//QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
diff --git a/src/message.c b/src/message.c
index 2b758b0..364ec21 100644
--- a/src/message.c
+++ b/src/message.c
@@ -86,6 +86,7 @@ PN_HANDLE(PN_DELIVERY_CTX)
ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0);
ALLOC_DEFINE(qd_message_content_t);
+ALLOC_DEFINE(qd_message_body_data_t);
typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
@@ -1092,6 +1093,8 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
if (!copy)
return 0;
+ ZERO(copy);
+
qd_buffer_list_clone(©->ma_to_override, &msg->ma_to_override);
qd_buffer_list_clone(©->ma_trace, &msg->ma_trace);
qd_buffer_list_clone(©->ma_ingress, &msg->ma_ingress);
@@ -2284,6 +2287,138 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
}
+/**
+ * find_last_buffer
+ *
+ * Given a field location, find the following:
+ *
+ * - *cursor - The pointer to the octet _past_ the last octet in the field. If this is the last octet in
+ * the buffer, the cursor must point one octet past the buffer.
+ * - *buffer - The last buffer that contains content for this field.
+ *
+ * Important: If the last octet of the field is the last octet of a buffer and there are more buffers in the
+ * buffer list, *buffer _must_ refer to the buffer that contains the last octet of the field and *cursor must
+ * point at the octet following that octet, even if it points past the end of the buffer.
+ */
+static void find_last_buffer(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer)
+{
+ //qd_buffer_t *buf = location->buffer;
+
+}
+
+
+/**
+ * qd_message_body_data_iterator
+ *
+ * Given a body_data object, return an iterator that refers to the content of that body data. This iterator
+ * shall not refer to the 3-byte performative header or the header for the vbin{8,32} field.
+ *
+ * The iterator must be freed eventually by the caller.
+ */
+qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data)
+{
+ return 0;
+}
+
+
+/**
+ * qd_message_body_data_buffer_count
+ *
+ * Return the number of buffers contained in the body_data object.
+ */
+int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
+{
+ return 0;
+}
+
+
+/**
+ * qd_message_body_data_buffers
+ *
+ * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the body_data
+ * object. Don't fill more than count raw_buffers with data. Start at offset from the zero-th buffer in the
+ * body_data.
+ */
+int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count)
+{
+ return 0;
+}
+
+
+/**
+ * qd_message_body_data_release
+ *
+ * Decrement the fanout ref-counts for all of the buffers referred to in the body_data. If any have reached zero,
+ * remove them from the buffer list and free them. Never dec-ref the last buffer in the content's buffer list.
+ */
+void qd_message_body_data_release(qd_message_body_data_t *body_data)
+{
+}
+
+
+qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd_message_body_data_t **out_body_data)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ qd_message_body_data_t *body_data = 0;
+
+ if (!msg->body_cursor) {
+ //
+ // We haven't returned a body-data record for this message yet.
+ //
+ qd_message_depth_status_t status = qd_message_check_depth(in_msg, QD_DEPTH_BODY);
+ if (status == QD_MESSAGE_DEPTH_OK) {
+ body_data = new_qd_message_body_data_t();
+ ZERO(body_data);
+ body_data->owning_message = msg;
+ body_data->section = msg->content->section_body;
+
+ find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
+ body_data->last_buffer = msg->body_buffer;
+
+ assert(DEQ_SIZE(msg->body_data_list) == 0);
+ DEQ_INSERT_TAIL(msg->body_data_list, body_data);
+ *out_body_data = body_data;
+ return QD_MESSAGE_BODY_DATA_OK;
+ } else if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
+ return QD_MESSAGE_BODY_DATA_INCOMPLETE;
+ else if (status == QD_MESSAGE_DEPTH_INVALID)
+ return QD_MESSAGE_BODY_DATA_INVALID;
+ }
+
+ qd_section_status_t section_status;
+ qd_field_location_t location;
+
+ section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
+ BODY_DATA_SHORT, 3, TAGS_BINARY,
+ &location);
+
+ switch (section_status) {
+ case QD_SECTION_INVALID:
+ case QD_SECTION_NO_MATCH:
+ return QD_MESSAGE_BODY_DATA_INVALID;
+
+ case QD_SECTION_MATCH:
+ body_data = new_qd_message_body_data_t();
+ ZERO(body_data);
+ body_data->owning_message = msg;
+ body_data->section = location;
+ find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
+ body_data->last_buffer = msg->body_buffer;
+ DEQ_INSERT_TAIL(msg->body_data_list, body_data);
+ *out_body_data = body_data;
+ return QD_MESSAGE_BODY_DATA_OK;
+
+ case QD_SECTION_NEED_MORE:
+ if (msg->content->receive_complete)
+ return QD_MESSAGE_BODY_DATA_NO_MORE;
+ else
+ return QD_MESSAGE_BODY_DATA_INCOMPLETE;
+ }
+
+ return QD_MESSAGE_BODY_DATA_NO_MORE;
+}
+
+
int qd_message_read_body(qd_message_t *in_msg, pn_raw_buffer_t* buffers, int length)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
diff --git a/src/message_private.h b/src/message_private.h
index d2b62f7..6dc901a 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -24,6 +24,8 @@
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/atomic.h>
+typedef struct qd_message_pvt_t qd_message_pvt_t;
+
/** @file
* Message representation.
*
@@ -59,6 +61,16 @@ typedef struct {
} qd_field_location_t;
+struct qd_message_body_data_t {
+ DEQ_LINKS(qd_message_body_data_t); // Linkage to form a DEQ
+ qd_message_pvt_t *owning_message; // Pointer to the owning message
+ qd_field_location_t section; // Section descriptor for the field
+ qd_buffer_t *last_buffer; // Pointer to the last buffer in the field
+};
+
+ALLOC_DECLARE(qd_message_body_data_t);
+DEQ_DECLARE(qd_message_body_data_t, qd_message_body_data_list_t);
+
// TODO - consider using pointers to qd_field_location_t below to save memory
// TODO - provide a way to allocate a message without a lock for the link-routing case.
// It's likely that link-routing will cause no contention for the message content.
@@ -93,7 +105,6 @@ typedef struct {
qd_field_location_t field_group_id;
qd_field_location_t field_group_sequence;
qd_field_location_t field_reply_to_group_id;
- qd_field_location_t body; // The body of the message
qd_buffer_t *parse_buffer; // Pointer to the buffer where parsing should resume, if needed
unsigned char *parse_cursor; // Pointer to octet in parse_buffer where parsing should resume, if needed
@@ -126,21 +137,24 @@ typedef struct {
uint8_t priority; // The priority of this message
} qd_message_content_t;
-typedef struct {
- qd_iterator_pointer_t cursor; // A pointer to the current location of the outgoing byte stream.
- qd_message_depth_t message_depth; // What is the depth of the message that has been received so far
- qd_message_depth_t sent_depth; // How much of the message has been sent? QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on.
- qd_message_content_t *content; // The actual content of the message. The content is never copied
- qd_buffer_list_t ma_to_override; // to field in outgoing message annotations.
- qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
- qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
- int ma_phase; // phase for the override address
- qd_field_location_t body_section; // Location of the current parsed body section
- bool strip_annotations_in;
- bool send_complete; // Has the message been completely received and completely sent?
- bool tag_sent; // Tags are sent
- bool is_fanout; // If msg is an outgoing fanout
-} qd_message_pvt_t;
+struct qd_message_pvt_t {
+ qd_iterator_pointer_t cursor; // A pointer to the current location of the outgoing byte stream.
+ qd_message_depth_t message_depth; // What is the depth of the message that has been received so far
+ qd_message_depth_t sent_depth; // How much of the message has been sent? QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on.
+ qd_message_content_t *content; // The actual content of the message. The content is never copied
+ qd_buffer_list_t ma_to_override; // to field in outgoing message annotations.
+ qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
+ qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
+ int ma_phase; // phase for the override address
+ qd_message_body_data_list_t body_data_list; // TODO - move this to the content for one-time parsing (TLR)
+ qd_message_body_data_t *next_body_data;
+ unsigned char *body_cursor;
+ qd_buffer_t *body_buffer;
+ bool strip_annotations_in;
+ bool send_complete; // Has the message been completely received and completely sent?
+ bool tag_sent; // Tags are sent
+ bool is_fanout; // If msg is an outgoing fanout
+};
ALLOC_DECLARE(qd_message_t);
ALLOC_DECLARE(qd_message_content_t);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org