You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/27 19:19:50 UTC
qpid-dispatch git commit: DISPATCH-1132 - Cleaned up message header
parsing
Repository: qpid-dispatch
Updated Branches:
refs/heads/master fa03ea400 -> e3e201652
DISPATCH-1132 - Cleaned up message header parsing
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e3e20165
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e3e20165
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e3e20165
Branch: refs/heads/master
Commit: e3e201652ca167e9a6cd3d12ff3106165f3bb89a
Parents: fa03ea4
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Sep 27 15:19:15 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Sep 27 15:19:15 2018 -0400
----------------------------------------------------------------------
include/qpid/dispatch/message.h | 4 +-
src/message.c | 107 +++++++++++------------------------
src/message_private.h | 16 ++----
src/router_core/forwarder.c | 25 +++++++-
4 files changed, 65 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e3e20165/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 27bc294..02a950e 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -422,8 +422,10 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted);
/**
* Return message priority
* @param msg A pointer to the message
+ * @param priority [out] The priority value, if present
+ * @return True iff the priority was present in the message header
*/
-uint8_t qd_message_get_priority(qd_message_t *msg);
+bool qd_message_get_priority(qd_message_t *msg, uint8_t *priority);
///@}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e3e20165/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 77a49e5..5c836ab 100644
--- a/src/message.c
+++ b/src/message.c
@@ -773,61 +773,29 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me
}
-// get the field location of a field in the message header (if it exists,
-// else 0)
-static qd_field_location_t *qd_message_header_field(qd_message_t *msg, qd_message_field_t field)
+static void qd_message_parse_priority(qd_message_t *in_msg)
{
- int first_header_field = QD_FIELD_DURABLE,
- last_header_field = QD_FIELD_DELIVERY_COUNT;
- static const intptr_t offsets[] = {
- // position of the fields' qd_field_location_t in the message content object
- (intptr_t) &((qd_message_content_t *)0)->field_durable,
- (intptr_t) &((qd_message_content_t *)0)->field_priority,
- (intptr_t) &((qd_message_content_t *)0)->field_ttl,
- (intptr_t) &((qd_message_content_t *)0)->field_first_acquirer,
- (intptr_t) &((qd_message_content_t *)0)->field_delivery_count
- };
- if (!(first_header_field <= field && field <= last_header_field)) {
- assert ( 0 );
- return 0;
+ qd_message_content_t *content = MSG_CONTENT(in_msg);
+ qd_iterator_t *iter = qd_message_field_iterator(in_msg, QD_FIELD_HEADER);
+
+ content->priority_parsed = true;
+ content->priority_present = false;
+
+ if (!!iter) {
+ qd_parsed_field_t *field = qd_parse(iter);
+ if (qd_parse_ok(field)) {
+ if (qd_parse_is_list(field) && qd_parse_sub_count(field) >= 2) {
+ qd_parsed_field_t *priority_field = qd_parse_sub_value(field, 1);
+ if (qd_parse_tag(priority_field) != QD_AMQP_NULL) {
+ uint32_t value = qd_parse_as_uint(priority_field);
+ content->priority = value >= QDR_N_PRIORITIES ? QDR_N_PRIORITIES - 1 : (uint8_t) (value & 0x00ff);
+ content->priority_present = true;
+ }
+ }
+ }
+ qd_parse_free(field);
+ qd_iterator_free(iter);
}
-
- qd_message_content_t *content = MSG_CONTENT(msg);
- if (!content->section_message_header.parsed) {
- if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed)
- return 0;
- }
- // If it's already been parsed, just return it.
- const int index = field - first_header_field;
- qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]);
- if (location->parsed)
- return location;
- // requested field not parsed out. Need to parse out up to the requested field:
- qd_field_location_t section = content->section_message_header;
- qd_buffer_t *buffer = section.buffer;
- unsigned char *cursor = qd_buffer_base(buffer) + section.offset;
- advance(&cursor, &buffer, section.hdr_length);
- int start = start_list(&cursor, &buffer);
- if (index > start)
- return 0; // properties list too short
- // Make sure that all fields up to the requested one are parsed.
- int position = 0;
- while (position < index) {
- qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]);
- // If it's parsed, advance over it. If not, parse it.
- if (f->parsed)
- advance(&cursor, &buffer, f->hdr_length + f->length);
- else
- if (!traverse_field(&cursor, &buffer, f))
- return 0;
- position++;
- }
- // all fields previous to the target have now been parsed and cursor/buffer
- // are in the correct position, parse out the field:
- if (traverse_field(&cursor, &buffer, location))
- return location;
- else
- return 0;
}
@@ -842,7 +810,10 @@ static qd_field_location_t *qd_message_field_location(qd_message_t *msg, qd_mess
switch (section) {
case QD_FIELD_HEADER:
- return qd_message_header_field(msg, field);
+ if (content->section_message_header.parsed ||
+ (qd_message_check(msg, QD_DEPTH_HEADER) && content->section_message_header.parsed))
+ return &content->section_message_header;
+ break;
case QD_FIELD_PROPERTIES:
return qd_message_properties_field(msg, field);
@@ -980,8 +951,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
copy->ma_phase = msg->ma_phase;
copy->strip_annotations_in = msg->strip_annotations_in;
- copy->content = content;
- copy->content->priority = content->priority;
+ copy->content = content;
copy->sent_depth = QD_DEPTH_NONE;
copy->cursor.buffer = 0;
@@ -1107,26 +1077,17 @@ void qd_message_add_fanout(qd_message_t *in_msg)
sys_atomic_inc(&msg->content->fanout);
}
-static void message_set_priority(qd_message_t *in_msg, uint8_t priority)
+bool qd_message_get_priority(qd_message_t *msg, uint8_t *priority)
{
- qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
- msg->content->priority = priority < QDR_N_PRIORITIES ? priority : QDR_N_PRIORITIES - 1;
-}
+ qd_message_content_t *content = MSG_CONTENT(msg);
-uint8_t qd_message_get_priority(qd_message_t *in_msg)
-{
- qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ if (!content->priority_parsed)
+ qd_message_parse_priority(msg);
- uint8_t priority = 0;
- qd_iterator_t *priority_iterator = qd_message_field_iterator(in_msg, QD_FIELD_PRIORITY);
- if (priority_iterator) {
- if (qd_iterator_remaining(priority_iterator) > 0) {
- priority = qd_iterator_uint8(priority_iterator);
- message_set_priority(in_msg, priority);
- }
- }
- qd_iterator_free(priority_iterator);
- return msg->content->priority;
+ if (content->priority_present)
+ *priority = content->priority;
+
+ return content->priority_present;
}
bool qd_message_receive_complete(qd_message_t *in_msg)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e3e20165/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 7b6c4ad..2f3a400 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -78,13 +78,6 @@ typedef struct {
qd_field_location_t section_footer; // The footer
qd_field_location_t field_user_annotations; // Opaque user message annotations, not a real field.
- // header fields
- qd_field_location_t field_durable;
- qd_field_location_t field_priority;
- qd_field_location_t field_ttl;
- qd_field_location_t field_first_acquirer;
- qd_field_location_t field_delivery_count;
-
qd_field_location_t field_message_id; // The string value of the message-id
qd_field_location_t field_user_id; // The string value of the user-id
qd_field_location_t field_to; // The string value of the to field
@@ -123,8 +116,10 @@ typedef struct {
bool q2_input_holdoff; // hold off calling pn_link_recv
bool aborted; // receive completed with abort flag set
bool disable_q2_holdoff; // Disable the Q2 flow control
- bool buffers_freed; // Has at least one buffer been freed ?
- uint8_t priority;
+ bool buffers_freed; // Has at least one buffer been freed ?
+ bool priority_parsed;
+ bool priority_present;
+ uint8_t priority; // The priority of this message
} qd_message_content_t;
typedef struct {
@@ -152,7 +147,8 @@ void qd_message_initialize();
qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
-#define QDR_N_PRIORITIES 10
+#define QDR_N_PRIORITIES 10
+#define QDR_DEFAULT_PRIORITY 4
///@}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e3e20165/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 1a43eec..c5e86f6 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -264,6 +264,24 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li
}
+/**
+ * Get the effective priority for a message.
+ *
+ * This function returns a priority value for a message (and address). If the message
+ * has no priority header, the default priority is chosen.
+ *
+ * TODO: Add the ability to get the priority from the address if not present in the message
+ */
+static uint8_t qdr_forward_effective_priority(qd_message_t *msg, qdr_address_t *addr)
+{
+ uint8_t priority;
+ bool has_priority = qd_message_get_priority(msg, &priority);
+ if (!has_priority)
+ priority = QDR_DEFAULT_PRIORITY;
+ return priority;
+}
+
+
int qdr_forward_multicast_CT(qdr_core_t *core,
qdr_address_t *addr,
qd_message_t *msg,
@@ -276,7 +294,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
qd_bitmask_t *link_exclusion = !!in_delivery ? in_delivery->link_exclusion : 0;
bool presettled = !!in_delivery ? in_delivery->settled : true;
bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
- int priority = qd_message_get_priority(msg);
+ uint8_t priority = qdr_forward_effective_priority(msg, addr);
+
qdr_forward_deliver_info_list_t deliver_info_list;
DEQ_INIT(deliver_info_list);
@@ -566,7 +585,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
else
next_node = rnode;
- uint8_t priority = qd_message_get_priority(msg);
+ uint8_t priority = qdr_forward_effective_priority(msg, addr);
out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
if (out_link) {
out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
@@ -671,7 +690,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
qdr_node_t *rnode = core->routers_by_mask_bit[node_bit];
qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
- uint8_t priority = qd_message_get_priority(msg);
+ uint8_t priority = qdr_forward_effective_priority(msg, addr);
qdr_link_t *link = peer_data_link(core, next_node, priority);
if (!link) continue;
int link_bit = link->conn->mask_bit;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org