You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/01/15 19:54:53 UTC

[2/3] qpid-dispatch git commit: DISPATCH-179 - Added delivery and flow linkage between core and node.

DISPATCH-179 - Added delivery and flow linkage between core and node.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/88842e3b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/88842e3b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/88842e3b

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 88842e3b774d75206086687aac7c5e7785653a24
Parents: f0bfea6
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Jan 14 12:42:17 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Jan 14 12:42:17 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  2 +-
 src/router_core/connections.c         | 34 +++++++++++++++++++++++++-----
 src/router_core/forwarder.c           | 26 +++++++++++++++++++++--
 src/router_core/router_core_private.h | 17 +++++++++++----
 src/router_core/transfer.c            | 28 ++++++++++++++++++++++++
 src/router_node.c                     | 11 ++++++++--
 tests/config-2/A.conf                 |  5 +++++
 7 files changed, 109 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index b0119eb..9cd97e2 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -489,7 +489,7 @@ typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn,
 typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
                                           qdr_terminus_t *source, qdr_terminus_t *target);
 typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, qdr_error_t *error);
-typedef void (*qdr_link_flow_t)          (void *context, qdr_link_t *link);
+typedef void (*qdr_link_flow_t)          (void *context, qdr_link_t *link, int credit);
 typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int delivery_count);
 typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
 typedef void (*qdr_link_push_t)          (void *context, qdr_link_t *link);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 6adaa7e..762570d 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -114,11 +114,14 @@ void *qdr_connection_get_context(const qdr_connection_t *conn)
 int qdr_connection_process(qdr_connection_t *conn)
 {
     qdr_connection_work_list_t  work_list;
+    qdr_link_ref_list_t         links_with_deliveries;
+    qdr_link_ref_list_t         links_with_credit;
     qdr_core_t                 *core = conn->core;
 
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(conn->work_list, work_list);
-    // TODO - Grab the list of links with deliveries
+    DEQ_MOVE(conn->links_with_deliveries, links_with_deliveries);
+    DEQ_MOVE(conn->links_with_credit, links_with_credit);
     sys_mutex_unlock(conn->work_lock);
 
     int event_count = DEQ_SIZE(work_list);
@@ -147,7 +150,20 @@ int qdr_connection_process(qdr_connection_t *conn)
         work = DEQ_HEAD(work_list);
     }
 
-    // TODO - Invoke the push handler for each link with deliveries
+    qdr_link_ref_t *ref = DEQ_HEAD(links_with_deliveries);
+    while (ref) {
+        core->push_handler(core->user_context, ref->link);
+        qdr_del_link_ref(&links_with_deliveries, ref->link, QDR_LINK_LIST_CLASS_DELIVERY);
+        ref = DEQ_HEAD(links_with_deliveries);
+    }
+
+    ref = DEQ_HEAD(links_with_credit);
+    while (ref) {
+        core->flow_handler(core->user_context, ref->link, ref->link->incremental_credit);
+        ref->link->incremental_credit = 0;
+        qdr_del_link_ref(&links_with_credit, ref->link, QDR_LINK_LIST_CLASS_FLOW);
+        ref = DEQ_HEAD(links_with_credit);
+    }
 
     return event_count;
 }
@@ -223,6 +239,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->conn = conn;
     link->name = (char*) malloc(strlen(name));
     strcpy(link->name, name);
+    link->link_direction = dir;
+    link->capacity = 32;  // TODO - make this configurable
 
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
@@ -258,6 +276,7 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error)
 {
     qdr_action_t *action = qdr_action(qdr_link_inbound_detach_CT, "link_detach");
 
+    action->args.connection.conn   = link->conn;
     action->args.connection.link   = link;
     action->args.connection.error  = error;
     action->args.connection.dt     = dt;
@@ -374,6 +393,7 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     link->conn           = conn;
     link->link_type      = link_type;
     link->link_direction = dir;
+    link->capacity       = 32; // TODO - make this configurable
     link->name           = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
     qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
 
@@ -682,6 +702,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
             if (qdr_terminus_is_anonymous(target)) {
                 link->owning_addr = 0;
                 qdr_link_outbound_second_attach_CT(core, link, source, target);
+                qdr_link_issue_credit_CT(core, link, link->capacity);
 
             } else {
                 //
@@ -712,6 +733,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                     link->owning_addr = addr;
                     qdr_add_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
                     qdr_link_outbound_second_attach_CT(core, link, source, target);
+                    qdr_link_issue_credit_CT(core, link, link->capacity);
                 }
             }
             break;
@@ -723,10 +745,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
 
         case QD_LINK_CONTROL:
             qdr_link_outbound_second_attach_CT(core, link, source, target);
+            qdr_link_issue_credit_CT(core, link, link->capacity);
             break;
 
         case QD_LINK_ROUTER:
             qdr_link_outbound_second_attach_CT(core, link, source, target);
+            qdr_link_issue_credit_CT(core, link, link->capacity);
             break;
         }
     } else {
@@ -811,16 +835,16 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
     if (discard)
         return;
 
-    qdr_connection_t *conn   = action->args.connection.conn;
     qdr_link_t       *link   = action->args.connection.link;
-    qd_direction_t    dir    = action->args.connection.dir;
+    qdr_connection_t *conn   = link->conn;
     qdr_terminus_t   *source = action->args.connection.source;
     qdr_terminus_t   *target = action->args.connection.target;
 
-    if (dir == QD_INCOMING) {
+    if (link->link_direction == QD_INCOMING) {
         //
         // Handle incoming link cases
         //
+        qdr_link_issue_credit_CT(core, link, link->capacity);
         switch (link->link_type) {
         case QD_LINK_ENDPOINT:
             break;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 4fedce8..92c3074 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -58,7 +58,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *pe
     dlv->settled = !peer || peer->settled;
     dlv->tag     = core->next_tag++;
 
-    if (peer->peer == 0)
+    if (peer && peer->peer == 0)
         peer->peer = dlv;  // TODO - make this a back-list for multicast tracking
 
     return dlv;
@@ -69,12 +69,12 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
 {
     sys_mutex_lock(link->conn->work_lock);
     DEQ_INSERT_TAIL(link->undelivered, dlv);
-    sys_mutex_unlock(link->conn->work_lock);
 
     //
     // If the link isn't already on the links_with_deliveries list, put it there.
     //
     qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+    sys_mutex_unlock(link->conn->work_lock);
 
     //
     // Activate the outgoing connection for later processing.
@@ -83,6 +83,23 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
 }
 
 
+void qdr_forward_on_message(qdr_core_t *core, qdr_general_work_t *work)
+{
+    work->on_message(work->on_message_context, work->msg, work->maskbit);
+}
+
+
+void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg)
+{
+    qdr_general_work_t *work = qdr_general_work(qdr_forward_on_message);
+    work->on_message         = sub->on_message;
+    work->on_message_context = sub->on_message_context;
+    work->msg                = qd_message_copy(msg);
+    work->maskbit            = link ? link->conn->mask_bit : 0;
+    qdr_post_general_work_CT(core, work);
+}
+
+
 int qdr_forward_multicast_CT(qdr_core_t      *core,
                              qdr_address_t   *addr,
                              qd_message_t    *msg,
@@ -181,6 +198,11 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         //
         // Forward to in-process subscribers
         //
+        qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
+        while (sub) {
+            qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg);
+            sub = DEQ_NEXT(sub);
+        }
     }
 
     return fanout;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0ef3db4..be006b8 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -181,7 +181,8 @@ DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
 
 #define QDR_LINK_LIST_CLASS_ADDRESS  0
 #define QDR_LINK_LIST_CLASS_DELIVERY 1
-#define QDR_LINK_LIST_CLASSES        2
+#define QDR_LINK_LIST_CLASS_FLOW     2
+#define QDR_LINK_LIST_CLASSES        3
 
 struct qdr_link_t {
     DEQ_LINKS(qdr_link_t);
@@ -198,6 +199,9 @@ struct qdr_link_t {
     qdr_delivery_list_t  unsettled;       ///< Unsettled deliveries
     bool                 strip_annotations_in;
     bool                 strip_annotations_out;
+    int                  capacity;
+    int                  incremental_credit_CT;
+    int                  incremental_credit;
 };
 
 ALLOC_DECLARE(qdr_link_t);
@@ -211,6 +215,9 @@ struct qdr_link_ref_t {
 ALLOC_DECLARE(qdr_link_ref_t);
 DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
 
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
+
 
 struct qdr_lrp_t {
     DEQ_LINKS(qdr_lrp_t);
@@ -270,9 +277,6 @@ DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
 qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t semantics);
 qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_semantics_t semantics);
 
-void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
-void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
-
 void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
 void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
 
@@ -293,6 +297,9 @@ struct qdr_general_work_t {
     qdr_general_work_handler_t  handler;
     qdr_field_t                *field;
     int                         maskbit;
+    qdr_receive_t               on_message;
+    void                       *on_message_context;
+    qd_message_t               *msg;
 };
 
 ALLOC_DECLARE(qdr_general_work_t);
@@ -341,6 +348,7 @@ struct qdr_connection_t {
     qdr_connection_work_list_t  work_list;
     sys_mutex_t                *work_lock;
     qdr_link_ref_list_t         links_with_deliveries;
+    qdr_link_ref_list_t         links_with_credit;
 };
 
 ALLOC_DECLARE(qdr_connection_t);
@@ -426,6 +434,7 @@ void  qdr_agent_setup_CT(qdr_core_t *core);
 void  qdr_forwarder_setup_CT(qdr_core_t *core);
 qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
 void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index c459628..a144adb 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -199,6 +199,34 @@ qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
 // In-Thread Functions
 //==================================================================================
 
+/**
+ * Check the link's accumulated credit.  If the credit given to the connection thread
+ * has been issued to Proton, provide the next batch of credit to the connection thread.
+ */
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
+{
+    link->incremental_credit_CT += credit;
+
+    if (link->incremental_credit_CT && link->incremental_credit == 0) {
+        //
+        // Move the credit from the core-thread value to the connection-thread value.
+        //
+        link->incremental_credit    = link->incremental_credit_CT;
+        link->incremental_credit_CT = 0;
+
+        //
+        // Put this link on the connection's has-credit list.
+        //
+        qdr_add_link_ref(&link->conn->links_with_credit, link, QDR_LINK_LIST_CLASS_FLOW);
+
+        //
+        // Activate the connection
+        //
+        qdr_connection_activate_CT(core, link->conn);
+    }
+}
+
+
 static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     if (discard)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c6c3403..c6ecb7a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -285,8 +285,10 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
             if (!addr_iter)
                 addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
 
-            if (addr_iter)
+            if (addr_iter) {
+                qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
                 delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd));
+            }
         } else
             delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd));
 
@@ -617,8 +619,12 @@ static void qd_router_link_detach(void *context, qdr_link_t *link, qdr_error_t *
 }
 
 
-static void qd_router_link_flow(void *context, qdr_link_t *link)
+static void qd_router_link_flow(void *context, qdr_link_t *link, int credit)
 {
+    qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+    pn_link_t *plink = qd_link_pn(qlink);
+
+    pn_link_flow(plink, credit);
 }
 
 
@@ -634,6 +640,7 @@ static void qd_router_link_drained(void *context, qdr_link_t *link)
 
 static void qd_router_link_push(void *context, qdr_link_t *link)
 {
+    printf("qd_router_link_push\n");
     qd_router_t *router      = (qd_router_t*) context;
     qd_link_t   *qlink       = (qd_link_t*) qdr_link_get_context(link);
     pn_link_t   *plink       = qd_link_pn(qlink);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/tests/config-2/A.conf
----------------------------------------------------------------------
diff --git a/tests/config-2/A.conf b/tests/config-2/A.conf
index a5b948d..4a5ccc9 100644
--- a/tests/config-2/A.conf
+++ b/tests/config-2/A.conf
@@ -91,3 +91,8 @@ log {
     enable: trace+
 }
 
+log {
+    module: ROUTER_HELLO
+    enable: trace+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org