You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/10 19:16:14 UTC
qpid-dispatch git commit: DISPATCH-1096 - priority messaging support
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 03ba19e65 -> 710b7059b
DISPATCH-1096 - priority messaging support
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/710b7059
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/710b7059
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/710b7059
Branch: refs/heads/master
Commit: 710b7059b77a72337a25c79b1e4e01856ca484b7
Parents: 03ba19e
Author: Michael Goulish <mg...@redhat.com>
Authored: Mon Sep 10 11:53:37 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Sep 10 14:19:25 2018 -0400
----------------------------------------------------------------------
include/qpid/dispatch/iterator.h | 2 +
include/qpid/dispatch/message.h | 7 +
src/iterator.c | 8 ++
src/message.c | 90 +++++++++---
src/message_private.h | 11 ++
src/router_core/connections.c | 216 ++++++++++++++++-------------
src/router_core/forwarder.c | 52 +++++--
src/router_core/route_tables.c | 7 +-
src/router_core/router_core_private.h | 14 +-
src/router_core/transfer.c | 15 +-
10 files changed, 287 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/include/qpid/dispatch/iterator.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h
index fa4267c..74d73ca 100644
--- a/include/qpid/dispatch/iterator.h
+++ b/include/qpid/dispatch/iterator.h
@@ -327,6 +327,8 @@ int qd_iterator_ncopy(qd_iterator_t *iter, unsigned char* buffer, int n);
*/
unsigned char *qd_iterator_copy(qd_iterator_t *iter);
+uint8_t qd_iterator_uint8(qd_iterator_t *iter);
+
/**
* Return a new iterator that is a duplicate of the original iterator, referring
* to the same base data. If the input iterator pointer is NULL, the duplicate
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index ca2ab47..ec0b901 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -411,6 +411,13 @@ bool qd_message_aborted(const qd_message_t *msg);
*/
void qd_message_set_aborted(const qd_message_t *msg, bool aborted);
+/**
+ * Return message priority
+ * @param msg A pointer to the message
+ */
+uint8_t qd_message_get_priority(qd_message_t *msg);
+
+
///@}
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 8889b17..910aa19 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -768,6 +768,14 @@ char* qd_iterator_strncpy(qd_iterator_t *iter, char* buffer, int n)
}
+uint8_t qd_iterator_uint8(qd_iterator_t *iter ) {
+ qd_iterator_reset(iter);
+ if (qd_iterator_end(iter))
+ return 0;
+ return (uint8_t) qd_iterator_octet(iter);
+}
+
+
unsigned char *qd_iterator_copy(qd_iterator_t *iter)
{
if (!iter)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index fce3394..35363cf 100644
--- a/src/message.c
+++ b/src/message.c
@@ -724,22 +724,57 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me
// else 0)
static qd_field_location_t *qd_message_header_field(qd_message_t *msg, qd_message_field_t field)
{
- qd_message_content_t *content = MSG_CONTENT(msg);
-
- if (!content->section_message_header.parsed) {
- if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed)
- return 0;
- }
-
- switch (field) {
- case QD_FIELD_HEADER:
- return &content->section_message_properties;
- default:
- // TBD: add header fields as needed (see qd_message_properties_field()
- // as an example)
- assert(false);
+ int first_header_field = QD_FIELD_DURABLE,
+ last_header_field = QD_FIELD_DELIVERY_COUNT;
+ static const intptr_t offsets[] = {
+ // position of the fields' qd_field_location_t in the message content object
+ (intptr_t) &((qd_message_content_t *)0)->field_durable,
+ (intptr_t) &((qd_message_content_t *)0)->field_priority,
+ (intptr_t) &((qd_message_content_t *)0)->field_ttl,
+ (intptr_t) &((qd_message_content_t *)0)->field_first_acquirer,
+ (intptr_t) &((qd_message_content_t *)0)->field_delivery_count
+ };
+ if (!(first_header_field <= field && field <= last_header_field)) {
+ assert ( 0 );
return 0;
}
+
+ qd_message_content_t *content = MSG_CONTENT(msg);
+ if (!content->section_message_header.parsed) {
+ if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed)
+ return 0;
+ }
+ // If it's already been parsed, just return it.
+ const int index = field - first_header_field;
+ qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]);
+ if (location->parsed)
+ return location;
+ // requested field not parsed out. Need to parse out up to the requested field:
+ qd_field_location_t section = content->section_message_header;
+ qd_buffer_t *buffer = section.buffer;
+ unsigned char *cursor = qd_buffer_base(buffer) + section.offset;
+ advance(&cursor, &buffer, section.hdr_length, 0, 0);
+ int start = start_list(&cursor, &buffer);
+ if (index > start)
+ return 0; // properties list too short
+ // Make sure that all fields up to the requested one are parsed.
+ int position = 0;
+ while (position < index) {
+ qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]);
+ // If it's parsed, advance over it. If not, parse it.
+ if (f->parsed)
+ advance(&cursor, &buffer, f->hdr_length + f->length, 0, 0);
+ else
+ if (!traverse_field(&cursor, &buffer, f))
+ return 0;
+ position++;
+ }
+ // all fields previous to the target have now been parsed and cursor/buffer
+ // are in the correct position, parse out the field:
+ if (traverse_field(&cursor, &buffer, location))
+ return location;
+ else
+ return 0;
}
@@ -892,7 +927,8 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
copy->ma_phase = msg->ma_phase;
copy->strip_annotations_in = msg->strip_annotations_in;
- copy->content = content;
+ copy->content = content;
+ copy->content->priority = content->priority;
copy->sent_depth = QD_DEPTH_NONE;
copy->cursor.buffer = 0;
@@ -1018,6 +1054,28 @@ void qd_message_add_fanout(qd_message_t *in_msg)
sys_atomic_inc(&msg->content->fanout);
}
+static void message_set_priority(qd_message_t *in_msg, uint8_t priority)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ msg->content->priority = priority < QDR_N_PRIORITIES ? priority : QDR_N_PRIORITIES - 1;
+}
+
+uint8_t qd_message_get_priority(qd_message_t *in_msg)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+
+ uint8_t priority = 0;
+ qd_iterator_t *priority_iterator = qd_message_field_iterator(in_msg, QD_FIELD_PRIORITY);
+ if (priority_iterator) {
+ if (qd_iterator_remaining(priority_iterator) > 0) {
+ priority = qd_iterator_uint8(priority_iterator);
+ message_set_priority(in_msg, priority);
+ }
+ }
+ qd_iterator_free(priority_iterator);
+ return msg->content->priority;
+}
+
bool qd_message_receive_complete(qd_message_t *in_msg)
{
if (!in_msg)
@@ -1800,7 +1858,7 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
qd_compose_start_list(field);
qd_compose_insert_bool(field, 0); // durable
- //qd_compose_insert_null(field); // priority
+ qd_compose_insert_null(field); // priority
//qd_compose_insert_null(field); // ttl
//qd_compose_insert_boolean(field, 0); // first-acquirer
//qd_compose_insert_uint(field, 0); // delivery-count
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index fe8147f..14d2593 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -77,6 +77,14 @@ typedef struct {
qd_field_location_t section_body; // The message body: Data
qd_field_location_t section_footer; // The footer
qd_field_location_t field_user_annotations; // Opaque user message annotations, not a real field.
+
+ // header fields
+ qd_field_location_t field_durable;
+ qd_field_location_t field_priority;
+ qd_field_location_t field_ttl;
+ qd_field_location_t field_first_acquirer;
+ qd_field_location_t field_delivery_count;
+
qd_field_location_t field_message_id; // The string value of the message-id
qd_field_location_t field_user_id; // The string value of the user-id
qd_field_location_t field_to; // The string value of the to field
@@ -115,6 +123,7 @@ typedef struct {
bool q2_input_holdoff; // hold off calling pn_link_recv
bool aborted; // receive completed with abort flag set
bool disable_q2_holdoff; // Disable the Q2 flow control
+ uint8_t priority;
} qd_message_content_t;
typedef struct {
@@ -142,6 +151,8 @@ void qd_message_initialize();
qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
+#define QDR_N_PRIORITIES 10
+
///@}
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 786877e..9716d7e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -205,7 +205,7 @@ const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *l
int qdr_connection_process(qdr_connection_t *conn)
{
qdr_connection_work_list_t work_list;
- qdr_link_ref_list_t links_with_work;
+ qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES];
qdr_core_t *core = conn->core;
qdr_link_ref_t *ref;
@@ -216,7 +216,9 @@ int qdr_connection_process(qdr_connection_t *conn)
sys_mutex_lock(conn->work_lock);
DEQ_MOVE(conn->work_list, work_list);
- DEQ_MOVE(conn->links_with_work, links_with_work);
+ for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+ DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]);
+ }
sys_mutex_unlock(conn->work_lock);
event_count += DEQ_SIZE(work_list);
@@ -241,97 +243,100 @@ int qdr_connection_process(qdr_connection_t *conn)
work = DEQ_HEAD(work_list);
}
- do {
- qdr_link_work_t *link_work;
- free_link = false;
+ // Process the links_with_work array from highest to lowest priority.
+ for (int priority = QDR_N_PRIORITIES - 1; priority >= 0; -- priority) {
+ do {
+ qdr_link_work_t *link_work;
+ free_link = false;
- sys_mutex_lock(conn->work_lock);
- ref = DEQ_HEAD(links_with_work);
- if (ref) {
- link = ref->link;
- qdr_del_link_ref(&links_with_work, ref->link, QDR_LINK_LIST_CLASS_WORK);
+ sys_mutex_lock(conn->work_lock);
+ ref = DEQ_HEAD(links_with_work[priority]);
+ if (ref) {
+ link = ref->link;
+ qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_WORK);
+
+ link_work = DEQ_HEAD(link->work_list);
+ if (link_work) {
+ DEQ_REMOVE_HEAD(link->work_list);
+ link_work->processing = true;
+ }
+ } else
+ link = 0;
+ sys_mutex_unlock(conn->work_lock);
- link_work = DEQ_HEAD(link->work_list);
- if (link_work) {
- DEQ_REMOVE_HEAD(link->work_list);
- link_work->processing = true;
- }
- } else
- link = 0;
- sys_mutex_unlock(conn->work_lock);
+ if (link) {
- if (link) {
+ //
+ // Handle disposition/settlement updates
+ //
+ qdr_delivery_ref_list_t updated_deliveries;
+ sys_mutex_lock(conn->work_lock);
+ DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+ sys_mutex_unlock(conn->work_lock);
- //
- // Handle disposition/settlement updates
- //
- qdr_delivery_ref_list_t updated_deliveries;
- sys_mutex_lock(conn->work_lock);
- DEQ_MOVE(link->updated_deliveries, updated_deliveries);
- sys_mutex_unlock(conn->work_lock);
+ qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
+ while (dref) {
+ core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled);
+ qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list");
+ qdr_del_delivery_ref(&updated_deliveries, dref);
+ dref = DEQ_HEAD(updated_deliveries);
+ event_count++;
+ }
- qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
- while (dref) {
- core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled);
- qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list");
- qdr_del_delivery_ref(&updated_deliveries, dref);
- dref = DEQ_HEAD(updated_deliveries);
- event_count++;
- }
+ while (link_work) {
+ switch (link_work->work_type) {
+ case QDR_LINK_WORK_DELIVERY :
+ {
+ int count = core->push_handler(core->user_context, link, link_work->value);
+ assert(count <= link_work->value);
+ link_work->value -= count;
+ break;
+ }
- while (link_work) {
- switch (link_work->work_type) {
- case QDR_LINK_WORK_DELIVERY :
- {
- int count = core->push_handler(core->user_context, link, link_work->value);
- assert(count <= link_work->value);
- link_work->value -= count;
+ case QDR_LINK_WORK_FLOW :
+ if (link_work->value > 0)
+ core->flow_handler(core->user_context, link, link_work->value);
+ if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET)
+ core->drain_handler(core->user_context, link, true);
+ else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
+ core->drain_handler(core->user_context, link, false);
+ else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
+ core->drained_handler(core->user_context, link);
break;
- }
- case QDR_LINK_WORK_FLOW :
- if (link_work->value > 0)
- core->flow_handler(core->user_context, link, link_work->value);
- if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET)
- core->drain_handler(core->user_context, link, true);
- else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
- core->drain_handler(core->user_context, link, false);
- else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
- core->drained_handler(core->user_context, link);
- break;
-
- case QDR_LINK_WORK_FIRST_DETACH :
- core->detach_handler(core->user_context, link, link_work->error, true, link_work->close_link);
- break;
-
- case QDR_LINK_WORK_SECOND_DETACH :
- core->detach_handler(core->user_context, link, link_work->error, false, link_work->close_link);
- free_link = true;
- break;
- }
+ case QDR_LINK_WORK_FIRST_DETACH :
+ core->detach_handler(core->user_context, link, link_work->error, true, link_work->close_link);
+ break;
- sys_mutex_lock(conn->work_lock);
- if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) {
- DEQ_INSERT_HEAD(link->work_list, link_work);
- link_work->processing = false;
- link_work = 0; // Halt work processing
- } else {
- qdr_error_free(link_work->error);
- free_qdr_link_work_t(link_work);
- link_work = DEQ_HEAD(link->work_list);
- if (link_work) {
- DEQ_REMOVE_HEAD(link->work_list);
- link_work->processing = true;
+ case QDR_LINK_WORK_SECOND_DETACH :
+ core->detach_handler(core->user_context, link, link_work->error, false, link_work->close_link);
+ free_link = true;
+ break;
}
+
+ sys_mutex_lock(conn->work_lock);
+ if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) {
+ DEQ_INSERT_HEAD(link->work_list, link_work);
+ link_work->processing = false;
+ link_work = 0; // Halt work processing
+ } else {
+ qdr_error_free(link_work->error);
+ free_qdr_link_work_t(link_work);
+ link_work = DEQ_HEAD(link->work_list);
+ if (link_work) {
+ DEQ_REMOVE_HEAD(link->work_list);
+ link_work->processing = true;
+ }
+ }
+ sys_mutex_unlock(conn->work_lock);
+ event_count++;
}
- sys_mutex_unlock(conn->work_lock);
- event_count++;
- }
- if (free_link)
- qdr_link_delete(link);
- }
- } while (free_link || link);
+ if (free_link)
+ qdr_link_delete(link);
+ }
+ } while (free_link || link);
+ }
return event_count;
}
@@ -569,7 +574,8 @@ void qdr_link_enqueue_work_CT(qdr_core_t *core,
sys_mutex_lock(conn->work_lock);
DEQ_INSERT_TAIL(link->work_list, work);
- qdr_add_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ // Enqueue work at priority 0.
+ qdr_add_link_ref(conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
sys_mutex_unlock(conn->work_lock);
qdr_connection_activate_CT(core, conn);
@@ -811,7 +817,9 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
if (link->link_type == QD_LINK_CONTROL)
core->control_links_by_mask_bit[conn->mask_bit] = 0;
if (link->link_type == QD_LINK_ROUTER)
- core->data_links_by_mask_bit[conn->mask_bit] = 0;
+ for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
+ if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority])
+ core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
}
//
@@ -844,7 +852,9 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
//
qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
sys_mutex_lock(conn->work_lock);
- qdr_del_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+ qdr_del_link_ref(conn->links_with_work + priority, link, QDR_LINK_LIST_CLASS_WORK);
+ }
sys_mutex_unlock(conn->work_lock);
//
@@ -1260,12 +1270,15 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
if (!conn->incoming) {
//
// The connector-side of inter-router/edge-uplink connections is responsible for setting up the
- // inter-router links: Two (in and out) for control, two for routed-message transfer.
+ // inter-router links: Two (in and out) for control, 2 * QDR_N_PRIORITIES for routed-message transfer.
//
(void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
(void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
- (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
- (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+
+ for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+ }
}
}
@@ -1320,10 +1333,13 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
//
// Remove the references in the links_with_work list
//
- qdr_link_ref_t *link_ref = DEQ_HEAD(conn->links_with_work);
- while (link_ref) {
- qdr_del_link_ref(&conn->links_with_work, link_ref->link, QDR_LINK_LIST_CLASS_WORK);
- link_ref = DEQ_HEAD(conn->links_with_work);
+ qdr_link_ref_t *link_ref;
+ for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+ link_ref = DEQ_HEAD(conn->links_with_work[priority]);
+ while (link_ref) {
+ qdr_del_link_ref(conn->links_with_work + priority, link_ref->link, QDR_LINK_LIST_CLASS_WORK);
+ link_ref = DEQ_HEAD(conn->links_with_work[priority]);
+ }
}
//
@@ -1411,8 +1427,14 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
//
static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
- if (conn->role == QDR_ROLE_INTER_ROUTER)
- core->data_links_by_mask_bit[conn->mask_bit] = link;
+ if (conn->role == QDR_ROLE_INTER_ROUTER) {
+ int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++;
+ if (next_slot >= QDR_N_PRIORITIES) {
+ qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many inter-router links for priority sheaf.");
+ return;
+ }
+ core->data_links_by_mask_bit[conn->mask_bit].links[next_slot] = link;
+ }
//
// TODO - This needs to be refactored in terms of a non-inter-router link type
@@ -1451,8 +1473,12 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd
static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
if (conn->role == QDR_ROLE_INTER_ROUTER)
- core->data_links_by_mask_bit[conn->mask_bit] = 0;
-
+ for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+ if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority]) {
+ core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
+ break;
+ }
+ }
//
// TODO - This needs to be refactored in terms of a non-inter-router link type
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 2840725..4364c47 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -24,6 +24,28 @@
#include "forwarder.h"
+static qdr_link_t * peer_data_link(qdr_core_t *core,
+ qdr_node_t *node,
+ int priority)
+{
+ int nlmb = node->link_mask_bit;
+
+ if (nlmb < 0 || priority < 0)
+ return 0;
+
+ // Try to return the requested priority link, but if it does
+ // not exist, return the closest one that is lower.
+ qdr_link_t * link = 0;
+ while (1) {
+ if ((link = core->data_links_by_mask_bit[nlmb].links[priority]))
+ return link;
+ if (-- priority < 0)
+ return 0;
+ }
+ return link;
+}
+
+
//==================================================================================
// Built-in Forwarders
//==================================================================================
@@ -153,7 +175,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link
}
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv)
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv, int priority)
{
sys_mutex_lock(out_link->conn->work_lock);
@@ -186,7 +208,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
work->value = 1;
DEQ_INSERT_TAIL(out_link->work_list, work);
}
- qdr_add_link_ref(&out_link->conn->links_with_work, out_link, QDR_LINK_LIST_CLASS_WORK);
+ qdr_add_link_ref(out_link->conn->links_with_work + priority, out_link, QDR_LINK_LIST_CLASS_WORK);
out_dlv->link_work = work;
sys_mutex_unlock(out_link->conn->work_lock);
@@ -243,6 +265,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
qd_bitmask_t *link_exclusion = !!in_delivery ? in_delivery->link_exclusion : 0;
bool presettled = !!in_delivery ? in_delivery->settled : true;
bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
+ int priority = qd_message_get_priority(msg);
//
// If the delivery is not presettled, set the settled flag for forwarding so all
@@ -262,7 +285,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
while (link_ref) {
qdr_link_t *out_link = link_ref->link;
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery);
+ qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
fanout++;
if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
addr->deliveries_egress++;
@@ -321,7 +344,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
else
next_node = rnode;
- dest_link = control ? PEER_CONTROL_LINK(core, next_node) : PEER_DATA_LINK(core, next_node);
+ dest_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
if (dest_link && qd_bitmask_value(rnode->valid_origins, origin))
qd_bitmask_set_bit(link_set, dest_link->conn->mask_bit);
}
@@ -334,10 +357,10 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
qd_bitmask_clear_bit(link_set, link_bit);
dest_link = control ?
core->control_links_by_mask_bit[link_bit] :
- core->data_links_by_mask_bit[link_bit];
+ core->data_links_by_mask_bit[link_bit].links[priority];
if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
- qdr_forward_deliver_CT(core, dest_link, out_delivery);
+ qdr_forward_deliver_CT(core, dest_link, out_delivery, priority);
fanout++;
addr->deliveries_transit++;
if (dest_link->link_type == QD_LINK_ROUTER)
@@ -462,7 +485,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
if (link_ref) {
out_link = link_ref->link;
out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery);
+ qdr_forward_deliver_CT(core, out_link, out_delivery, qd_message_get_priority(msg));
//
// If there are multiple local subscribers, rotate the list of link references
@@ -509,10 +532,11 @@ int qdr_forward_closest_CT(qdr_core_t *core,
else
next_node = rnode;
- out_link = control ? PEER_CONTROL_LINK(core, next_node) : PEER_DATA_LINK(core, next_node);
+ uint8_t priority = qd_message_get_priority(msg);
+ out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
if (out_link) {
out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery);
+ qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
addr->deliveries_transit++;
if (out_link->link_type == QD_LINK_ROUTER)
core->deliveries_transit++;
@@ -613,7 +637,8 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
qdr_node_t *rnode = core->routers_by_mask_bit[node_bit];
qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
- qdr_link_t *link = PEER_DATA_LINK(core, next_node);
+ uint8_t priority = qd_message_get_priority(msg);
+ qdr_link_t *link = peer_data_link(core, next_node, priority);
if (!link) continue;
int link_bit = link->conn->mask_bit;
int value = addr->outstanding_deliveries[link_bit];
@@ -660,7 +685,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
if (chosen_link) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
- qdr_forward_deliver_CT(core, chosen_link, out_delivery);
+ qdr_forward_deliver_CT(core, chosen_link, out_delivery, qd_message_get_priority(msg));
//
// If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.
@@ -761,8 +786,9 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core,
else
next_node = rnode;
- if (next_node && PEER_DATA_LINK(core, next_node))
- conn = PEER_DATA_LINK(core, next_node)->conn;
+ qdr_link_t * pdl = peer_data_link(core, next_node, 0);
+ if (next_node && pdl)
+ conn = pdl->conn;
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 3072496..a4d3bc1 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -242,11 +242,14 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
- core->data_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
+ core->data_links_by_mask_bit = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width());
for (int idx = 0; idx < qd_bitmask_width(); idx++) {
core->routers_by_mask_bit[idx] = 0;
core->control_links_by_mask_bit[idx] = 0;
- core->data_links_by_mask_bit[idx] = 0;
+ core->data_links_by_mask_bit[idx].count = 0;
+ for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
+ core->data_links_by_mask_bit[idx].links[priority] = 0;
+
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index a68e115..5e3f4c2 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -20,6 +20,7 @@
*/
#include "dispatch_private.h"
+#include "message_private.h"
#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/atomic.h>
@@ -290,7 +291,8 @@ DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode);
#define PEER_CONTROL_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->control_links_by_mask_bit[n->link_mask_bit] : 0)
-#define PEER_DATA_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->data_links_by_mask_bit[n->link_mask_bit] : 0)
+// PEER_DATA_LINK has gotten more complex with prioritized links, and is now a function, peer_data_link().
+
struct qdr_router_ref_t {
@@ -566,7 +568,7 @@ struct qdr_connection_t {
qdr_connection_work_list_t work_list;
sys_mutex_t *work_lock;
qdr_link_ref_list_t links;
- qdr_link_ref_list_t links_with_work;
+ qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES];
char *tenant_space;
int tenant_space_len;
qdr_connection_info_t *connection_info;
@@ -662,6 +664,10 @@ struct qdr_conn_identifier_t {
ALLOC_DECLARE(qdr_conn_identifier_t);
DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t);
+typedef struct qdr_priority_sheaf_t {
+ qdr_link_t *links[QDR_N_PRIORITIES];
+ int count;
+} qdr_priority_sheaf_t;
struct qdr_core_t {
qd_dispatch_t *qd;
@@ -738,7 +744,7 @@ struct qdr_core_t {
qd_bitmask_t *neighbor_free_mask;
qdr_node_t **routers_by_mask_bit;
qdr_link_t **control_links_by_mask_bit;
- qdr_link_t **data_links_by_mask_bit;
+ qdr_priority_sheaf_t *data_links_by_mask_bit;
uint64_t cost_epoch;
uint64_t next_tag;
@@ -838,7 +844,7 @@ void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, int priority);
void qdr_connection_free(qdr_connection_t *conn);
void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 2031b0d..ae1dbd9 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -743,7 +743,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
//
if (link->stalled_outbound) {
link->stalled_outbound = false;
- qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ // Adding this work at priority 0.
+ qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
}
@@ -781,7 +782,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
if (work)
DEQ_INSERT_TAIL(link->work_list, work);
if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
- qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ // Adding this work at priority 0.
+ qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
}
sys_mutex_unlock(link->conn->work_lock);
@@ -999,7 +1001,8 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
peer->tag_length = action->args.connection.tag_length;
memcpy(peer->tag, action->args.connection.tag, peer->tag_length);
- qdr_forward_deliver_CT(core, link->connected_link, peer);
+ // Adding this work at priority 0.
+ qdr_forward_deliver_CT(core, link->connected_link, peer, 0);
link->total_deliveries++;
@@ -1171,7 +1174,8 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
if (work) {
sys_mutex_lock(peer->link->conn->work_lock);
if (work->processing || work == DEQ_HEAD(peer->link->work_list)) {
- qdr_add_link_ref(&peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK);
+ // Adding this work at priority 0.
+ qdr_add_link_ref(peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK);
sys_mutex_unlock(peer->link->conn->work_lock);
//
@@ -1357,7 +1361,8 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
qdr_delivery_incref(dlv, "qdr_delivery_push_CT - add to updated list");
qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv);
- qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ // Adding this work at priority 0.
+ qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
activate = true;
}
sys_mutex_unlock(link->conn->work_lock);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org