You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:09 UTC

[qpid-dispatch] 23/32: Dataplane: (from gsim) Implementation of qd_message_read_body.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 20f841a982cab439cc1e902f0b4f961d21133b60
Author: Ted Ross <tr...@apache.org>
AuthorDate: Fri Jun 19 13:19:12 2020 -0400

    Dataplane: (from gsim) Implementation of qd_message_read_body.
---
 src/message.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/src/message.c b/src/message.c
index 97bcab2..55d134e 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2237,6 +2237,61 @@ int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
 }
 
 
+int qd_message_read_body(qd_message_t *in_msg, pn_raw_buffer_t* buffers, int length)
+{
+    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    if (!(msg->cursor.buffer && msg->cursor.cursor)) {
+        qd_field_location_t  *loc     = qd_message_field_location(in_msg, QD_FIELD_BODY);
+        if (!loc || loc->tag == QD_AMQP_NULL)
+            return 0;
+        // TODO: need to actually determine this, could be different if vbin32 sent
+        int preamble = 5;
+        if (loc->offset + preamble < qd_buffer_size(loc->buffer)) {
+            msg->cursor.buffer = loc->buffer;
+            msg->cursor.cursor = qd_buffer_base(loc->buffer) + loc->offset + preamble;
+        } else {
+            msg->cursor.buffer = DEQ_NEXT(loc->buffer);
+            if (!msg->cursor.buffer) return 0;
+            msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + ((loc->offset + preamble) - qd_buffer_size(loc->buffer));
+        }
+    }
+
+    qd_buffer_t   *buf    = msg->cursor.buffer;
+    unsigned char *cursor = msg->cursor.cursor;
+
+    // if we are at the end of the current buffer, try to move to the
+    // next buffer
+    if (cursor == qd_buffer_base(buf) + qd_buffer_size(buf)) {
+        buf = DEQ_NEXT(buf);
+        if (buf) {
+            cursor = qd_buffer_base(buf);
+            msg->cursor.buffer = buf;
+            msg->cursor.cursor = cursor;
+        } else {
+            return 0;
+        }
+    }
+
+    int count;
+    for (count = 0; count < length && buf; count++) {
+        buffers[count].bytes = (char*) qd_buffer_base(buf);
+        buffers[count].capacity = qd_buffer_size(buf);
+        buffers[count].size = qd_buffer_size(buf);
+        buffers[count].offset = cursor - qd_buffer_base(buf);
+        buffers[count].context = (uintptr_t) buf;
+        buf = DEQ_NEXT(buf);
+        if (buf) {
+            cursor = qd_buffer_base(buf);
+            msg->cursor.buffer = buf;
+            msg->cursor.cursor = cursor;
+        } else {
+            msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + qd_buffer_size(msg->cursor.buffer);
+        }
+    }
+    return count;
+}
+
+
 qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg)
 {
     return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org