You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:09 UTC
[qpid-dispatch] 23/32: Dataplane: (from gsim) Implementation of
qd_message_read_body.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 20f841a982cab439cc1e902f0b4f961d21133b60
Author: Ted Ross <tr...@apache.org>
AuthorDate: Fri Jun 19 13:19:12 2020 -0400
Dataplane: (from gsim) Implementation of qd_message_read_body.
---
src/message.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 55 insertions(+)
diff --git a/src/message.c b/src/message.c
index 97bcab2..55d134e 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2237,6 +2237,61 @@ int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
}
+int qd_message_read_body(qd_message_t *in_msg, pn_raw_buffer_t* buffers, int length)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ if (!(msg->cursor.buffer && msg->cursor.cursor)) {
+ qd_field_location_t *loc = qd_message_field_location(in_msg, QD_FIELD_BODY);
+ if (!loc || loc->tag == QD_AMQP_NULL)
+ return 0;
+ // TODO: need to actually determine this, could be different if vbin32 sent
+ int preamble = 5;
+ if (loc->offset + preamble < qd_buffer_size(loc->buffer)) {
+ msg->cursor.buffer = loc->buffer;
+ msg->cursor.cursor = qd_buffer_base(loc->buffer) + loc->offset + preamble;
+ } else {
+ msg->cursor.buffer = DEQ_NEXT(loc->buffer);
+ if (!msg->cursor.buffer) return 0;
+ msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + ((loc->offset + preamble) - qd_buffer_size(loc->buffer));
+ }
+ }
+
+ qd_buffer_t *buf = msg->cursor.buffer;
+ unsigned char *cursor = msg->cursor.cursor;
+
+ // if we are at the end of the current buffer, try to move to the
+ // next buffer
+ if (cursor == qd_buffer_base(buf) + qd_buffer_size(buf)) {
+ buf = DEQ_NEXT(buf);
+ if (buf) {
+ cursor = qd_buffer_base(buf);
+ msg->cursor.buffer = buf;
+ msg->cursor.cursor = cursor;
+ } else {
+ return 0;
+ }
+ }
+
+ int count;
+ for (count = 0; count < length && buf; count++) {
+ buffers[count].bytes = (char*) qd_buffer_base(buf);
+ buffers[count].capacity = qd_buffer_size(buf);
+ buffers[count].size = qd_buffer_size(buf);
+ buffers[count].offset = cursor - qd_buffer_base(buf);
+ buffers[count].context = (uintptr_t) buf;
+ buf = DEQ_NEXT(buf);
+ if (buf) {
+ cursor = qd_buffer_base(buf);
+ msg->cursor.buffer = buf;
+ msg->cursor.cursor = cursor;
+ } else {
+ msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + qd_buffer_size(msg->cursor.buffer);
+ }
+ }
+ return count;
+}
+
+
qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg)
{
return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org