You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/10 19:16:14 UTC

qpid-dispatch git commit: DISPATCH-1096 - priority messaging support

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 03ba19e65 -> 710b7059b


DISPATCH-1096 - priority messaging support


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/710b7059
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/710b7059
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/710b7059

Branch: refs/heads/master
Commit: 710b7059b77a72337a25c79b1e4e01856ca484b7
Parents: 03ba19e
Author: Michael Goulish <mg...@redhat.com>
Authored: Mon Sep 10 11:53:37 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Sep 10 14:19:25 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/iterator.h      |   2 +
 include/qpid/dispatch/message.h       |   7 +
 src/iterator.c                        |   8 ++
 src/message.c                         |  90 +++++++++---
 src/message_private.h                 |  11 ++
 src/router_core/connections.c         | 216 ++++++++++++++++-------------
 src/router_core/forwarder.c           |  52 +++++--
 src/router_core/route_tables.c        |   7 +-
 src/router_core/router_core_private.h |  14 +-
 src/router_core/transfer.c            |  15 +-
 10 files changed, 287 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/include/qpid/dispatch/iterator.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h
index fa4267c..74d73ca 100644
--- a/include/qpid/dispatch/iterator.h
+++ b/include/qpid/dispatch/iterator.h
@@ -327,6 +327,8 @@ int qd_iterator_ncopy(qd_iterator_t *iter, unsigned char* buffer, int n);
  */
 unsigned char *qd_iterator_copy(qd_iterator_t *iter);
 
+uint8_t qd_iterator_uint8(qd_iterator_t *iter);
+
 /**
  * Return a new iterator that is a duplicate of the original iterator, referring
  * to the same base data.  If the input iterator pointer is NULL, the duplicate

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index ca2ab47..ec0b901 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -411,6 +411,13 @@ bool qd_message_aborted(const qd_message_t *msg);
  */
 void qd_message_set_aborted(const qd_message_t *msg, bool aborted);
 
+/**
+ * Return message priority
+ * @param msg A pointer to the message
+ */
+uint8_t qd_message_get_priority(qd_message_t *msg);
+
+
 ///@}
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 8889b17..910aa19 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -768,6 +768,14 @@ char* qd_iterator_strncpy(qd_iterator_t *iter, char* buffer, int n)
 }
 
 
+uint8_t qd_iterator_uint8(qd_iterator_t *iter ) {
+    qd_iterator_reset(iter);
+    if (qd_iterator_end(iter))
+        return 0;
+    return (uint8_t) qd_iterator_octet(iter);
+}
+
+
 unsigned char *qd_iterator_copy(qd_iterator_t *iter)
 {
     if (!iter)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index fce3394..35363cf 100644
--- a/src/message.c
+++ b/src/message.c
@@ -724,22 +724,57 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me
 // else 0)
 static qd_field_location_t *qd_message_header_field(qd_message_t *msg, qd_message_field_t field)
 {
-    qd_message_content_t *content = MSG_CONTENT(msg);
-
-    if (!content->section_message_header.parsed) {
-        if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed)
-            return 0;
-    }
-
-    switch (field) {
-    case QD_FIELD_HEADER:
-        return &content->section_message_properties;
-    default:
-        // TBD: add header fields as needed (see qd_message_properties_field()
-        // as an example)
-        assert(false);
+    int first_header_field = QD_FIELD_DURABLE,
+        last_header_field  = QD_FIELD_DELIVERY_COUNT;
+    static const intptr_t offsets[] = {
+        // position of the fields' qd_field_location_t in the message content object
+        (intptr_t) &((qd_message_content_t *)0)->field_durable,
+        (intptr_t) &((qd_message_content_t *)0)->field_priority,
+        (intptr_t) &((qd_message_content_t *)0)->field_ttl,
+        (intptr_t) &((qd_message_content_t *)0)->field_first_acquirer,
+        (intptr_t) &((qd_message_content_t *)0)->field_delivery_count
+    };
+    if (!(first_header_field <= field && field <= last_header_field)) {
+        assert ( 0 );
         return 0;
     }
+
+    qd_message_content_t *content = MSG_CONTENT(msg);
+     if (!content->section_message_header.parsed) {
+         if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed)
+             return 0;
+     }
+    // If it's already been parsed, just return it.
+    const int index = field - first_header_field;
+    qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]);
+    if (location->parsed)
+        return location;
+    // requested field not parsed out.  Need to parse out up to the requested field:
+    qd_field_location_t section = content->section_message_header;
+    qd_buffer_t   *buffer = section.buffer;
+    unsigned char *cursor = qd_buffer_base(buffer) + section.offset;
+    advance(&cursor, &buffer, section.hdr_length, 0, 0);
+    int start = start_list(&cursor, &buffer);
+    if (index > start)
+        return 0;  // properties list too short
+    // Make sure that all fields up to the requested one are parsed.
+    int position = 0;
+    while (position < index) {
+        qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]);
+        // If it's parsed, advance over it. If not, parse it.
+        if (f->parsed)
+            advance(&cursor, &buffer, f->hdr_length + f->length, 0, 0);
+        else
+        if (!traverse_field(&cursor, &buffer, f))
+            return 0;
+        position++;
+     }
+    // all fields previous to the target have now been parsed and cursor/buffer
+    // are in the correct position, parse out the field:
+    if (traverse_field(&cursor, &buffer, location))
+        return location;
+    else
+         return 0;
 }
 
 
@@ -892,7 +927,8 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     copy->ma_phase = msg->ma_phase;
     copy->strip_annotations_in  = msg->strip_annotations_in;
 
-    copy->content = content;
+    copy->content           = content;
+    copy->content->priority = content->priority;
 
     copy->sent_depth    = QD_DEPTH_NONE;
     copy->cursor.buffer = 0;
@@ -1018,6 +1054,28 @@ void qd_message_add_fanout(qd_message_t *in_msg)
     sys_atomic_inc(&msg->content->fanout);
 }
 
+static void message_set_priority(qd_message_t *in_msg, uint8_t priority)
+{
+    qd_message_pvt_t *msg  = (qd_message_pvt_t*) in_msg;
+    msg->content->priority = priority < QDR_N_PRIORITIES ? priority : QDR_N_PRIORITIES - 1;
+}
+
+uint8_t qd_message_get_priority(qd_message_t *in_msg)
+{
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+
+    uint8_t  priority = 0;
+    qd_iterator_t *priority_iterator = qd_message_field_iterator(in_msg, QD_FIELD_PRIORITY);
+    if (priority_iterator) {
+        if (qd_iterator_remaining(priority_iterator) > 0) {
+            priority = qd_iterator_uint8(priority_iterator);
+            message_set_priority(in_msg, priority);
+        }
+    }
+    qd_iterator_free(priority_iterator);
+    return msg->content->priority;
+}
+
 bool qd_message_receive_complete(qd_message_t *in_msg)
 {
     if (!in_msg)
@@ -1800,7 +1858,7 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
 
     qd_compose_start_list(field);
     qd_compose_insert_bool(field, 0);     // durable
-    //qd_compose_insert_null(field);        // priority
+    qd_compose_insert_null(field);        // priority
     //qd_compose_insert_null(field);        // ttl
     //qd_compose_insert_boolean(field, 0);  // first-acquirer
     //qd_compose_insert_uint(field, 0);     // delivery-count

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index fe8147f..14d2593 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -77,6 +77,14 @@ typedef struct {
     qd_field_location_t  section_body;                    // The message body: Data
     qd_field_location_t  section_footer;                  // The footer
     qd_field_location_t  field_user_annotations;          // Opaque user message annotations, not a real field.
+
+    // header fields
+    qd_field_location_t  field_durable;
+    qd_field_location_t  field_priority;
+    qd_field_location_t  field_ttl;
+    qd_field_location_t  field_first_acquirer;
+    qd_field_location_t  field_delivery_count;
+
     qd_field_location_t  field_message_id;                // The string value of the message-id
     qd_field_location_t  field_user_id;                   // The string value of the user-id
     qd_field_location_t  field_to;                        // The string value of the to field
@@ -115,6 +123,7 @@ typedef struct {
     bool                 q2_input_holdoff;               // hold off calling pn_link_recv
     bool                 aborted;                        // receive completed with abort flag set
     bool                 disable_q2_holdoff;             // Disable the Q2 flow control
+    uint8_t              priority;
 } qd_message_content_t;
 
 typedef struct {
@@ -142,6 +151,8 @@ void qd_message_initialize();
 
 qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
 
+#define QDR_N_PRIORITIES 10
+
 ///@}
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 786877e..9716d7e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -205,7 +205,7 @@ const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *l
 int qdr_connection_process(qdr_connection_t *conn)
 {
     qdr_connection_work_list_t  work_list;
-    qdr_link_ref_list_t         links_with_work;
+    qdr_link_ref_list_t         links_with_work[QDR_N_PRIORITIES];
     qdr_core_t                 *core = conn->core;
 
     qdr_link_ref_t *ref;
@@ -216,7 +216,9 @@ int qdr_connection_process(qdr_connection_t *conn)
 
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(conn->work_list, work_list);
-    DEQ_MOVE(conn->links_with_work, links_with_work);
+    for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+        DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]);
+    }
     sys_mutex_unlock(conn->work_lock);
 
     event_count += DEQ_SIZE(work_list);
@@ -241,97 +243,100 @@ int qdr_connection_process(qdr_connection_t *conn)
         work = DEQ_HEAD(work_list);
     }
 
-    do {
-        qdr_link_work_t *link_work;
-        free_link = false;
+    // Process the links_with_work array from highest to lowest priority.
+    for (int priority = QDR_N_PRIORITIES - 1; priority >= 0; -- priority) {
+        do {
+            qdr_link_work_t *link_work;
+            free_link = false;
 
-        sys_mutex_lock(conn->work_lock);
-        ref = DEQ_HEAD(links_with_work);
-        if (ref) {
-            link = ref->link;
-            qdr_del_link_ref(&links_with_work, ref->link, QDR_LINK_LIST_CLASS_WORK);
+            sys_mutex_lock(conn->work_lock);
+            ref = DEQ_HEAD(links_with_work[priority]);
+            if (ref) {
+                link = ref->link;
+                qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_WORK);
+
+                link_work = DEQ_HEAD(link->work_list);
+                if (link_work) {
+                    DEQ_REMOVE_HEAD(link->work_list);
+                    link_work->processing = true;
+                }
+            } else
+                link = 0;
+            sys_mutex_unlock(conn->work_lock);
 
-            link_work = DEQ_HEAD(link->work_list);
-            if (link_work) {
-                DEQ_REMOVE_HEAD(link->work_list);
-                link_work->processing = true;
-            }
-        } else
-            link = 0;
-        sys_mutex_unlock(conn->work_lock);
+            if (link) {
 
-        if (link) {
+                //
+                // Handle disposition/settlement updates
+                //
+                qdr_delivery_ref_list_t updated_deliveries;
+                sys_mutex_lock(conn->work_lock);
+                DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+                sys_mutex_unlock(conn->work_lock);
 
-            //
-            // Handle disposition/settlement updates
-            //
-            qdr_delivery_ref_list_t updated_deliveries;
-            sys_mutex_lock(conn->work_lock);
-            DEQ_MOVE(link->updated_deliveries, updated_deliveries);
-            sys_mutex_unlock(conn->work_lock);
+                qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
+                while (dref) {
+                    core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled);
+                    qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list");
+                    qdr_del_delivery_ref(&updated_deliveries, dref);
+                    dref = DEQ_HEAD(updated_deliveries);
+                    event_count++;
+                }
 
-            qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
-            while (dref) {
-                core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled);
-                qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list");
-                qdr_del_delivery_ref(&updated_deliveries, dref);
-                dref = DEQ_HEAD(updated_deliveries);
-                event_count++;
-            }
+                while (link_work) {
+                    switch (link_work->work_type) {
+                    case QDR_LINK_WORK_DELIVERY :
+                        {
+                            int count = core->push_handler(core->user_context, link, link_work->value);
+                            assert(count <= link_work->value);
+                            link_work->value -= count;
+                            break;
+                        }
 
-            while (link_work) {
-                switch (link_work->work_type) {
-                case QDR_LINK_WORK_DELIVERY :
-                    {
-                        int count = core->push_handler(core->user_context, link, link_work->value);
-                        assert(count <= link_work->value);
-                        link_work->value -= count;
+                    case QDR_LINK_WORK_FLOW :
+                        if (link_work->value > 0)
+                            core->flow_handler(core->user_context, link, link_work->value);
+                        if      (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET)
+                            core->drain_handler(core->user_context, link, true);
+                        else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
+                            core->drain_handler(core->user_context, link, false);
+                        else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
+                            core->drained_handler(core->user_context, link);
                         break;
-                    }
 
-                case QDR_LINK_WORK_FLOW :
-                    if (link_work->value > 0)
-                        core->flow_handler(core->user_context, link, link_work->value);
-                    if      (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET)
-                        core->drain_handler(core->user_context, link, true);
-                    else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
-                        core->drain_handler(core->user_context, link, false);
-                    else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
-                        core->drained_handler(core->user_context, link);
-                    break;
-
-                case QDR_LINK_WORK_FIRST_DETACH :
-                    core->detach_handler(core->user_context, link, link_work->error, true, link_work->close_link);
-                    break;
-
-                case QDR_LINK_WORK_SECOND_DETACH :
-                    core->detach_handler(core->user_context, link, link_work->error, false, link_work->close_link);
-                    free_link = true;
-                    break;
-                }
+                    case QDR_LINK_WORK_FIRST_DETACH :
+                        core->detach_handler(core->user_context, link, link_work->error, true, link_work->close_link);
+                        break;
 
-                sys_mutex_lock(conn->work_lock);
-                if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) {
-                    DEQ_INSERT_HEAD(link->work_list, link_work);
-                    link_work->processing = false;
-                    link_work = 0; // Halt work processing
-                } else {
-                    qdr_error_free(link_work->error);
-                    free_qdr_link_work_t(link_work);
-                    link_work = DEQ_HEAD(link->work_list);
-                    if (link_work) {
-                        DEQ_REMOVE_HEAD(link->work_list);
-                        link_work->processing = true;
+                    case QDR_LINK_WORK_SECOND_DETACH :
+                        core->detach_handler(core->user_context, link, link_work->error, false, link_work->close_link);
+                        free_link = true;
+                        break;
                     }
+
+                    sys_mutex_lock(conn->work_lock);
+                    if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) {
+                        DEQ_INSERT_HEAD(link->work_list, link_work);
+                        link_work->processing = false;
+                        link_work = 0; // Halt work processing
+                    } else {
+                        qdr_error_free(link_work->error);
+                        free_qdr_link_work_t(link_work);
+                        link_work = DEQ_HEAD(link->work_list);
+                        if (link_work) {
+                            DEQ_REMOVE_HEAD(link->work_list);
+                            link_work->processing = true;
+                        }
+                    }
+                    sys_mutex_unlock(conn->work_lock);
+                    event_count++;
                 }
-                sys_mutex_unlock(conn->work_lock);
-                event_count++;
-            }
 
-            if (free_link)
-                qdr_link_delete(link);
-        }
-    } while (free_link || link);
+                if (free_link)
+                    qdr_link_delete(link);
+            }
+        } while (free_link || link);
+    }
 
     return event_count;
 }
@@ -569,7 +574,8 @@ void qdr_link_enqueue_work_CT(qdr_core_t      *core,
 
     sys_mutex_lock(conn->work_lock);
     DEQ_INSERT_TAIL(link->work_list, work);
-    qdr_add_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+    // Enqueue work at priority 0.
+    qdr_add_link_ref(conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
     sys_mutex_unlock(conn->work_lock);
 
     qdr_connection_activate_CT(core, conn);
@@ -811,7 +817,9 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
         if (link->link_type == QD_LINK_CONTROL)
             core->control_links_by_mask_bit[conn->mask_bit] = 0;
         if (link->link_type == QD_LINK_ROUTER)
-            core->data_links_by_mask_bit[conn->mask_bit] = 0;
+            for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
+                if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority])
+                    core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
     }
 
     //
@@ -844,7 +852,9 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     //
     qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
     sys_mutex_lock(conn->work_lock);
-    qdr_del_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+    for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+        qdr_del_link_ref(conn->links_with_work + priority, link, QDR_LINK_LIST_CLASS_WORK);
+    }
     sys_mutex_unlock(conn->work_lock);
 
     //
@@ -1260,12 +1270,15 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
             if (!conn->incoming) {
                 //
                 // The connector-side of inter-router/edge-uplink connections is responsible for setting up the
-                // inter-router links:  Two (in and out) for control, two for routed-message transfer.
+                // inter-router links:  Two (in and out) for control, 2 * QDR_N_PRIORITIES for routed-message transfer.
                 //
                 (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
                 (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
-                (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
-                (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+
+                for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+                    (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
+                    (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+                }
             }
         }
 
@@ -1320,10 +1333,13 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
     //
     // Remove the references in the links_with_work list
     //
-    qdr_link_ref_t *link_ref = DEQ_HEAD(conn->links_with_work);
-    while (link_ref) {
-        qdr_del_link_ref(&conn->links_with_work, link_ref->link, QDR_LINK_LIST_CLASS_WORK);
-        link_ref = DEQ_HEAD(conn->links_with_work);
+    qdr_link_ref_t *link_ref;
+    for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+        link_ref = DEQ_HEAD(conn->links_with_work[priority]);
+        while (link_ref) {
+            qdr_del_link_ref(conn->links_with_work + priority, link_ref->link, QDR_LINK_LIST_CLASS_WORK);
+            link_ref = DEQ_HEAD(conn->links_with_work[priority]);
+        }
     }
 
     //
@@ -1411,8 +1427,14 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
 //
 static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
-    if (conn->role == QDR_ROLE_INTER_ROUTER)
-        core->data_links_by_mask_bit[conn->mask_bit] = link;
+    if (conn->role == QDR_ROLE_INTER_ROUTER) {
+        int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++;
+        if (next_slot >= QDR_N_PRIORITIES) {
+            qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many inter-router links for priority sheaf.");
+            return;
+        }
+        core->data_links_by_mask_bit[conn->mask_bit].links[next_slot] = link;
+    }
 
     //
     // TODO - This needs to be refactored in terms of a non-inter-router link type
@@ -1451,8 +1473,12 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd
 static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
     if (conn->role == QDR_ROLE_INTER_ROUTER)
-        core->data_links_by_mask_bit[conn->mask_bit] = 0;
-
+        for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+            if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority]) {
+                core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
+                break;
+            }
+        }
     //
     // TODO - This needs to be refactored in terms of a non-inter-router link type
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 2840725..4364c47 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -24,6 +24,28 @@
 #include "forwarder.h"
 
 
+static qdr_link_t * peer_data_link(qdr_core_t *core,
+                                   qdr_node_t *node,
+                                   int         priority)
+{
+    int nlmb = node->link_mask_bit;
+
+    if (nlmb < 0 || priority < 0)
+        return 0;
+
+    // Try to return the requested priority link, but if it does
+    // not exist, return the closest one that is lower.
+    qdr_link_t * link = 0;
+    while (1) {
+        if ((link = core->data_links_by_mask_bit[nlmb].links[priority]))
+            return link;
+        if (-- priority < 0)
+            return 0;
+    }
+    return link;
+}
+
+
 //==================================================================================
 // Built-in Forwarders
 //==================================================================================
@@ -153,7 +175,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link
 }
 
 
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv)
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv, int priority)
 {
     sys_mutex_lock(out_link->conn->work_lock);
 
@@ -186,7 +208,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
         work->value     = 1;
         DEQ_INSERT_TAIL(out_link->work_list, work);
     }
-    qdr_add_link_ref(&out_link->conn->links_with_work, out_link, QDR_LINK_LIST_CLASS_WORK);
+    qdr_add_link_ref(out_link->conn->links_with_work + priority, out_link, QDR_LINK_LIST_CLASS_WORK);
 
     out_dlv->link_work = work;
     sys_mutex_unlock(out_link->conn->work_lock);
@@ -243,6 +265,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     qd_bitmask_t *link_exclusion       = !!in_delivery ? in_delivery->link_exclusion : 0;
     bool          presettled           = !!in_delivery ? in_delivery->settled : true;
     bool          receive_complete     = qd_message_receive_complete(qdr_delivery_message(in_delivery));
+    int           priority             = qd_message_get_priority(msg);
 
     //
     // If the delivery is not presettled, set the settled flag for forwarding so all
@@ -262,7 +285,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         while (link_ref) {
             qdr_link_t     *out_link     = link_ref->link;
             qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-            qdr_forward_deliver_CT(core, out_link, out_delivery);
+            qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
             fanout++;
             if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
                 addr->deliveries_egress++;
@@ -321,7 +344,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             else
                 next_node = rnode;
 
-            dest_link = control ? PEER_CONTROL_LINK(core, next_node) : PEER_DATA_LINK(core, next_node);
+            dest_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
             if (dest_link && qd_bitmask_value(rnode->valid_origins, origin))
                 qd_bitmask_set_bit(link_set, dest_link->conn->mask_bit);
         }
@@ -334,10 +357,10 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             qd_bitmask_clear_bit(link_set, link_bit);
             dest_link = control ?
                 core->control_links_by_mask_bit[link_bit] :
-                core->data_links_by_mask_bit[link_bit];
+                core->data_links_by_mask_bit[link_bit].links[priority];
             if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
                 qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
-                qdr_forward_deliver_CT(core, dest_link, out_delivery);
+                qdr_forward_deliver_CT(core, dest_link, out_delivery, priority);
                 fanout++;
                 addr->deliveries_transit++;
                 if (dest_link->link_type == QD_LINK_ROUTER)
@@ -462,7 +485,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     if (link_ref) {
         out_link     = link_ref->link;
         out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-        qdr_forward_deliver_CT(core, out_link, out_delivery);
+        qdr_forward_deliver_CT(core, out_link, out_delivery, qd_message_get_priority(msg));
 
         //
         // If there are multiple local subscribers, rotate the list of link references
@@ -509,10 +532,11 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             else
                 next_node = rnode;
 
-            out_link = control ? PEER_CONTROL_LINK(core, next_node) : PEER_DATA_LINK(core, next_node);
+            uint8_t priority = qd_message_get_priority(msg);
+            out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
             if (out_link) {
                 out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-                qdr_forward_deliver_CT(core, out_link, out_delivery);
+                qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
                 addr->deliveries_transit++;
                 if (out_link->link_type == QD_LINK_ROUTER)
                     core->deliveries_transit++;
@@ -613,7 +637,8 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
         for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
             qdr_node_t *rnode     = core->routers_by_mask_bit[node_bit];
             qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
-            qdr_link_t *link      = PEER_DATA_LINK(core, next_node);
+            uint8_t     priority  = qd_message_get_priority(msg);
+            qdr_link_t *link      = peer_data_link(core, next_node, priority);
             if (!link) continue;
             int         link_bit  = link->conn->mask_bit;
             int         value     = addr->outstanding_deliveries[link_bit];
@@ -660,7 +685,7 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
 
     if (chosen_link) {
         qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
-        qdr_forward_deliver_CT(core, chosen_link, out_delivery);
+        qdr_forward_deliver_CT(core, chosen_link, out_delivery, qd_message_get_priority(msg));
 
         //
         // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.
@@ -761,8 +786,9 @@ bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
                 else
                     next_node = rnode;
 
-                if (next_node && PEER_DATA_LINK(core, next_node))
-                    conn = PEER_DATA_LINK(core, next_node)->conn;
+                qdr_link_t * pdl = peer_data_link(core, next_node, 0);
+                if (next_node && pdl)
+                    conn = pdl->conn;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 3072496..a4d3bc1 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -242,11 +242,14 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
 
         core->routers_by_mask_bit       = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
         core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
-        core->data_links_by_mask_bit    = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
+        core->data_links_by_mask_bit    = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width());
         for (int idx = 0; idx < qd_bitmask_width(); idx++) {
             core->routers_by_mask_bit[idx]   = 0;
             core->control_links_by_mask_bit[idx] = 0;
-            core->data_links_by_mask_bit[idx] = 0;
+            core->data_links_by_mask_bit[idx].count = 0;
+            for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
+              core->data_links_by_mask_bit[idx].links[priority] = 0;
+
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index a68e115..5e3f4c2 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -20,6 +20,7 @@
  */
 
 #include "dispatch_private.h"
+#include "message_private.h"
 #include <qpid/dispatch/router_core.h>
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/atomic.h>
@@ -290,7 +291,8 @@ DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
 void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode);
 
 #define PEER_CONTROL_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->control_links_by_mask_bit[n->link_mask_bit] : 0)
-#define PEER_DATA_LINK(c,n)    ((n->link_mask_bit >= 0) ? (c)->data_links_by_mask_bit[n->link_mask_bit] : 0)
+// PEER_DATA_LINK has gotten more complex with prioritized links, and is now a function, peer_data_link().
+
 
 
 struct qdr_router_ref_t {
@@ -566,7 +568,7 @@ struct qdr_connection_t {
     qdr_connection_work_list_t  work_list;
     sys_mutex_t                *work_lock;
     qdr_link_ref_list_t         links;
-    qdr_link_ref_list_t         links_with_work;
+    qdr_link_ref_list_t         links_with_work[QDR_N_PRIORITIES];
     char                       *tenant_space;
     int                         tenant_space_len;
     qdr_connection_info_t      *connection_info;
@@ -662,6 +664,10 @@ struct qdr_conn_identifier_t {
 ALLOC_DECLARE(qdr_conn_identifier_t);
 DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t);
 
+typedef struct qdr_priority_sheaf_t {
+    qdr_link_t *links[QDR_N_PRIORITIES];
+    int count;
+} qdr_priority_sheaf_t;
 
 struct qdr_core_t {
     qd_dispatch_t     *qd;
@@ -738,7 +744,7 @@ struct qdr_core_t {
     qd_bitmask_t         *neighbor_free_mask;
     qdr_node_t          **routers_by_mask_bit;
     qdr_link_t          **control_links_by_mask_bit;
-    qdr_link_t          **data_links_by_mask_bit;
+    qdr_priority_sheaf_t *data_links_by_mask_bit;
     uint64_t              cost_epoch;
 
     uint64_t              next_tag;
@@ -838,7 +844,7 @@ void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
 void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
 bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
 qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, int priority);
 void qdr_connection_free(qdr_connection_t *conn);
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
 qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 2031b0d..ae1dbd9 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -743,7 +743,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
     //
     if (link->stalled_outbound) {
         link->stalled_outbound = false;
-        qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+        // Adding this work at priority 0.
+        qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
         activate = true;
     }
 
@@ -781,7 +782,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
             if (work)
                 DEQ_INSERT_TAIL(link->work_list, work);
             if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
-                qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+                // Adding this work at priority 0.
+                qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
                 activate = true;
             }
             sys_mutex_unlock(link->conn->work_lock);
@@ -999,7 +1001,8 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
         peer->tag_length = action->args.connection.tag_length;
         memcpy(peer->tag, action->args.connection.tag, peer->tag_length);
 
-        qdr_forward_deliver_CT(core, link->connected_link, peer);
+        // Adding this work at priority 0.
+        qdr_forward_deliver_CT(core, link->connected_link, peer, 0);
 
         link->total_deliveries++;
 
@@ -1171,7 +1174,8 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
         if (work) {
             sys_mutex_lock(peer->link->conn->work_lock);
             if (work->processing || work == DEQ_HEAD(peer->link->work_list)) {
-                qdr_add_link_ref(&peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK);
+                // Adding this work at priority 0.
+                qdr_add_link_ref(peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK);
                 sys_mutex_unlock(peer->link->conn->work_lock);
 
                 //
@@ -1357,7 +1361,8 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
     if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
         qdr_delivery_incref(dlv, "qdr_delivery_push_CT - add to updated list");
         qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv);
-        qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+        // Adding this work at priority 0.
+        qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
         activate = true;
     }
     sys_mutex_unlock(link->conn->work_lock);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org