You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/27 19:19:50 UTC

qpid-dispatch git commit: DISPATCH-1132 - Cleaned up message header parsing

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master fa03ea400 -> e3e201652


DISPATCH-1132 - Cleaned up message header parsing


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e3e20165
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e3e20165
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e3e20165

Branch: refs/heads/master
Commit: e3e201652ca167e9a6cd3d12ff3106165f3bb89a
Parents: fa03ea4
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Sep 27 15:19:15 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Sep 27 15:19:15 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/message.h |   4 +-
 src/message.c                   | 107 +++++++++++------------------------
 src/message_private.h           |  16 ++----
 src/router_core/forwarder.c     |  25 +++++++-
 4 files changed, 65 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e3e20165/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 27bc294..02a950e 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -422,8 +422,10 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted);
 /**
  * Return message priority
  * @param msg A pointer to the message
+ * @param priority [out] The priority value, if present
+ * @return True iff the priority was present in the message header
  */
-uint8_t qd_message_get_priority(qd_message_t *msg);
+bool qd_message_get_priority(qd_message_t *msg, uint8_t *priority);
 
 
 ///@}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e3e20165/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 77a49e5..5c836ab 100644
--- a/src/message.c
+++ b/src/message.c
@@ -773,61 +773,29 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me
 }
 
 
-// get the field location of a field in the message header (if it exists,
-// else 0)
-static qd_field_location_t *qd_message_header_field(qd_message_t *msg, qd_message_field_t field)
+static void qd_message_parse_priority(qd_message_t *in_msg)
 {
-    int first_header_field = QD_FIELD_DURABLE,
-        last_header_field  = QD_FIELD_DELIVERY_COUNT;
-    static const intptr_t offsets[] = {
-        // position of the fields' qd_field_location_t in the message content object
-        (intptr_t) &((qd_message_content_t *)0)->field_durable,
-        (intptr_t) &((qd_message_content_t *)0)->field_priority,
-        (intptr_t) &((qd_message_content_t *)0)->field_ttl,
-        (intptr_t) &((qd_message_content_t *)0)->field_first_acquirer,
-        (intptr_t) &((qd_message_content_t *)0)->field_delivery_count
-    };
-    if (!(first_header_field <= field && field <= last_header_field)) {
-        assert ( 0 );
-        return 0;
+    qd_message_content_t *content  = MSG_CONTENT(in_msg);
+    qd_iterator_t        *iter     = qd_message_field_iterator(in_msg, QD_FIELD_HEADER);
+
+    content->priority_parsed  = true;
+    content->priority_present = false;
+
+    if (!!iter) {
+        qd_parsed_field_t *field = qd_parse(iter);
+        if (qd_parse_ok(field)) {
+            if (qd_parse_is_list(field) && qd_parse_sub_count(field) >= 2) {
+                qd_parsed_field_t *priority_field = qd_parse_sub_value(field, 1);
+                if (qd_parse_tag(priority_field) != QD_AMQP_NULL) {
+                    uint32_t value = qd_parse_as_uint(priority_field);
+                    content->priority = value >= QDR_N_PRIORITIES ? QDR_N_PRIORITIES - 1 : (uint8_t) (value & 0x00ff);
+                    content->priority_present = true;
+                }
+            }
+        }
+        qd_parse_free(field);
+        qd_iterator_free(iter);
     }
-
-    qd_message_content_t *content = MSG_CONTENT(msg);
-     if (!content->section_message_header.parsed) {
-         if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed)
-             return 0;
-     }
-    // If it's already been parsed, just return it.
-    const int index = field - first_header_field;
-    qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]);
-    if (location->parsed)
-        return location;
-    // requested field not parsed out.  Need to parse out up to the requested field:
-    qd_field_location_t section = content->section_message_header;
-    qd_buffer_t   *buffer = section.buffer;
-    unsigned char *cursor = qd_buffer_base(buffer) + section.offset;
-    advance(&cursor, &buffer, section.hdr_length);
-    int start = start_list(&cursor, &buffer);
-    if (index > start)
-        return 0;  // properties list too short
-    // Make sure that all fields up to the requested one are parsed.
-    int position = 0;
-    while (position < index) {
-        qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]);
-        // If it's parsed, advance over it. If not, parse it.
-        if (f->parsed)
-            advance(&cursor, &buffer, f->hdr_length + f->length);
-        else
-        if (!traverse_field(&cursor, &buffer, f))
-            return 0;
-        position++;
-     }
-    // all fields previous to the target have now been parsed and cursor/buffer
-    // are in the correct position, parse out the field:
-    if (traverse_field(&cursor, &buffer, location))
-        return location;
-    else
-         return 0;
 }
 
 
@@ -842,7 +810,10 @@ static qd_field_location_t *qd_message_field_location(qd_message_t *msg, qd_mess
 
     switch (section) {
     case QD_FIELD_HEADER:
-        return qd_message_header_field(msg, field);
+        if (content->section_message_header.parsed ||
+            (qd_message_check(msg, QD_DEPTH_HEADER) && content->section_message_header.parsed))
+            return &content->section_message_header;
+        break;
 
     case QD_FIELD_PROPERTIES:
         return qd_message_properties_field(msg, field);
@@ -980,8 +951,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     copy->ma_phase = msg->ma_phase;
     copy->strip_annotations_in  = msg->strip_annotations_in;
 
-    copy->content           = content;
-    copy->content->priority = content->priority;
+    copy->content = content;
 
     copy->sent_depth    = QD_DEPTH_NONE;
     copy->cursor.buffer = 0;
@@ -1107,26 +1077,17 @@ void qd_message_add_fanout(qd_message_t *in_msg)
     sys_atomic_inc(&msg->content->fanout);
 }
 
-static void message_set_priority(qd_message_t *in_msg, uint8_t priority)
+bool qd_message_get_priority(qd_message_t *msg, uint8_t *priority)
 {
-    qd_message_pvt_t *msg  = (qd_message_pvt_t*) in_msg;
-    msg->content->priority = priority < QDR_N_PRIORITIES ? priority : QDR_N_PRIORITIES - 1;
-}
+    qd_message_content_t *content = MSG_CONTENT(msg);
 
-uint8_t qd_message_get_priority(qd_message_t *in_msg)
-{
-    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    if (!content->priority_parsed)
+        qd_message_parse_priority(msg);
 
-    uint8_t  priority = 0;
-    qd_iterator_t *priority_iterator = qd_message_field_iterator(in_msg, QD_FIELD_PRIORITY);
-    if (priority_iterator) {
-        if (qd_iterator_remaining(priority_iterator) > 0) {
-            priority = qd_iterator_uint8(priority_iterator);
-            message_set_priority(in_msg, priority);
-        }
-    }
-    qd_iterator_free(priority_iterator);
-    return msg->content->priority;
+    if (content->priority_present)
+        *priority = content->priority;
+
+    return content->priority_present;
 }
 
 bool qd_message_receive_complete(qd_message_t *in_msg)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e3e20165/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 7b6c4ad..2f3a400 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -78,13 +78,6 @@ typedef struct {
     qd_field_location_t  section_footer;                  // The footer
     qd_field_location_t  field_user_annotations;          // Opaque user message annotations, not a real field.
 
-    // header fields
-    qd_field_location_t  field_durable;
-    qd_field_location_t  field_priority;
-    qd_field_location_t  field_ttl;
-    qd_field_location_t  field_first_acquirer;
-    qd_field_location_t  field_delivery_count;
-
     qd_field_location_t  field_message_id;                // The string value of the message-id
     qd_field_location_t  field_user_id;                   // The string value of the user-id
     qd_field_location_t  field_to;                        // The string value of the to field
@@ -123,8 +116,10 @@ typedef struct {
     bool                 q2_input_holdoff;               // hold off calling pn_link_recv
     bool                 aborted;                        // receive completed with abort flag set
     bool                 disable_q2_holdoff;             // Disable the Q2 flow control
-    bool                 buffers_freed;                   // Has at least one buffer been freed ?
-    uint8_t              priority;
+    bool                 buffers_freed;                  // Has at least one buffer been freed ?
+    bool                 priority_parsed;
+    bool                 priority_present;
+    uint8_t              priority;                       // The priority of this message
 } qd_message_content_t;
 
 typedef struct {
@@ -152,7 +147,8 @@ void qd_message_initialize();
 
 qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
 
-#define QDR_N_PRIORITIES 10
+#define QDR_N_PRIORITIES     10
+#define QDR_DEFAULT_PRIORITY  4
 
 ///@}
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e3e20165/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 1a43eec..c5e86f6 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -264,6 +264,24 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li
 }
 
 
+/**
+ * Get the effective priority for a message.
+ *
+ * This function returns a priority value for a message (and address).  If the message
+ * has no priority header, the default priority is chosen.
+ *
+ * TODO: Add the ability to get the priority from the address if not present in the message
+ */
+static uint8_t qdr_forward_effective_priority(qd_message_t *msg, qdr_address_t *addr)
+{
+    uint8_t priority;
+    bool    has_priority = qd_message_get_priority(msg, &priority);
+    if (!has_priority)
+        priority = QDR_DEFAULT_PRIORITY;
+    return priority;
+}
+
+
 int qdr_forward_multicast_CT(qdr_core_t      *core,
                              qdr_address_t   *addr,
                              qd_message_t    *msg,
@@ -276,7 +294,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     qd_bitmask_t *link_exclusion       = !!in_delivery ? in_delivery->link_exclusion : 0;
     bool          presettled           = !!in_delivery ? in_delivery->settled : true;
     bool          receive_complete     = qd_message_receive_complete(qdr_delivery_message(in_delivery));
-    int           priority             = qd_message_get_priority(msg);
+    uint8_t       priority             = qdr_forward_effective_priority(msg, addr);
+
     qdr_forward_deliver_info_list_t deliver_info_list;
     DEQ_INIT(deliver_info_list);
 
@@ -566,7 +585,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             else
                 next_node = rnode;
 
-            uint8_t priority = qd_message_get_priority(msg);
+            uint8_t priority = qdr_forward_effective_priority(msg, addr);
             out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
             if (out_link) {
                 out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
@@ -671,7 +690,7 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
         for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
             qdr_node_t *rnode     = core->routers_by_mask_bit[node_bit];
             qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
-            uint8_t     priority  = qd_message_get_priority(msg);
+            uint8_t     priority  = qdr_forward_effective_priority(msg, addr);
             qdr_link_t *link      = peer_data_link(core, next_node, priority);
             if (!link) continue;
             int         link_bit  = link->conn->mask_bit;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org