You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/01/15 19:54:52 UTC
[1/3] qpid-dispatch git commit: DISPATCH-179 - Added outgoing
delivery linkage
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 59cc7b53d -> 6d9dc9e9d
DISPATCH-179 - Added outgoing delivery linkage
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f0bfea6f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f0bfea6f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f0bfea6f
Branch: refs/heads/tross-DISPATCH-179-1
Commit: f0bfea6f30f7f9f9b3c104c74e1324d89f698fd7
Parents: 59cc7b5
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Jan 13 13:50:27 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Jan 13 13:50:27 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 15 +++++-
src/router_core/connections.c | 27 +++++++----
src/router_core/forwarder.c | 30 +++++++++---
src/router_core/router_core.c | 17 ++++---
src/router_core/router_core_private.h | 17 +++++--
src/router_core/transfer.c | 74 ++++++++++++++++++++++++++++++
src/router_node.c | 47 ++++++++++++++++++-
7 files changed, 201 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 20840cd..b0119eb 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -482,12 +482,18 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled);
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg);
+void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
+
typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error);
typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link);
+typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count);
+typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
+typedef void (*qdr_link_push_t) (void *context, qdr_link_t *link);
+typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery);
void qdr_connection_handlers(qdr_core_t *core,
void *context,
@@ -495,7 +501,11 @@ void qdr_connection_handlers(qdr_core_t *core,
qdr_link_first_attach_t first_attach,
qdr_link_second_attach_t second_attach,
qdr_link_detach_t detach,
- qdr_link_flow_t flow);
+ qdr_link_flow_t flow,
+ qdr_link_offer_t offer,
+ qdr_link_drained_t drained,
+ qdr_link_push_t push,
+ qdr_link_deliver_t deliver);
/**
******************************************************************************
@@ -506,11 +516,14 @@ void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
void *qdr_delivery_get_context(qdr_delivery_t *delivery);
uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
bool qdr_delivery_is_settled(const qdr_delivery_t *delivery);
+void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length);
+qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
void qdr_delivery_update_disposition(qdr_delivery_t *delivery);
void qdr_delivery_update_flow(qdr_delivery_t *delivery);
void qdr_delivery_process(qdr_delivery_t *delivery);
+
/**
******************************************************************************
* Management functions
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 697b6ce..6adaa7e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -118,6 +118,7 @@ int qdr_connection_process(qdr_connection_t *conn)
sys_mutex_lock(conn->work_lock);
DEQ_MOVE(conn->work_list, work_list);
+ // TODO - Grab the list of links with deliveries
sys_mutex_unlock(conn->work_lock);
int event_count = DEQ_SIZE(work_list);
@@ -146,6 +147,8 @@ int qdr_connection_process(qdr_connection_t *conn)
work = DEQ_HEAD(work_list);
}
+ // TODO - Invoke the push handler for each link with deliveries
+
return event_count;
}
@@ -268,7 +271,11 @@ void qdr_connection_handlers(qdr_core_t *core,
qdr_link_first_attach_t first_attach,
qdr_link_second_attach_t second_attach,
qdr_link_detach_t detach,
- qdr_link_flow_t flow)
+ qdr_link_flow_t flow,
+ qdr_link_offer_t offer,
+ qdr_link_drained_t drained,
+ qdr_link_push_t push,
+ qdr_link_deliver_t deliver)
{
core->user_context = context;
core->activate_handler = activate;
@@ -276,6 +283,10 @@ void qdr_connection_handlers(qdr_core_t *core,
core->second_attach_handler = second_attach;
core->detach_handler = detach;
core->flow_handler = flow;
+ core->offer_handler = offer;
+ core->drained_handler = drained;
+ core->push_handler = push;
+ core->deliver_handler = deliver;
}
@@ -699,7 +710,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// to do an address lookup for deliveries that arrive on this link.
//
link->owning_addr = addr;
- qdr_add_link_ref(&addr->inlinks, link);
+ qdr_add_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
qdr_link_outbound_second_attach_CT(core, link, source, target);
}
}
@@ -747,7 +758,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// to do an address lookup for deliveries that arrive on this link.
//
link->owning_addr = addr;
- qdr_add_link_ref(&addr->rlinks, link);
+ qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
if (DEQ_SIZE(addr->rlinks) == 1) {
const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
if (key && *key == 'M')
@@ -764,7 +775,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
case QD_LINK_CONTROL:
link->owning_addr = core->hello_addr;
- qdr_add_link_ref(&core->hello_addr->rlinks, link);
+ qdr_add_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
core->control_links_by_mask_bit[conn->mask_bit] = link;
qdr_link_outbound_second_attach_CT(core, link, source, target);
break;
@@ -836,7 +847,7 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
case QD_LINK_CONTROL:
link->owning_addr = core->hello_addr;
- qdr_add_link_ref(&core->hello_addr->rlinks, link);
+ qdr_add_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
core->control_links_by_mask_bit[conn->mask_bit] = link;
break;
@@ -885,7 +896,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
switch (link->link_type) {
case QD_LINK_ENDPOINT:
if (addr)
- qdr_del_link_ref(&addr->inlinks, link);
+ qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
break;
case QD_LINK_WAYPOINT:
@@ -904,7 +915,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
switch (link->link_type) {
case QD_LINK_ENDPOINT:
if (addr) {
- qdr_del_link_ref(&addr->rlinks, link);
+ qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
was_local = true;
}
break;
@@ -913,7 +924,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
break;
case QD_LINK_CONTROL:
- qdr_del_link_ref(&core->hello_addr->rlinks, link);
+ qdr_del_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
core->control_links_by_mask_bit[conn->mask_bit] = 0;
qdr_post_link_lost_CT(core, conn->mask_bit);
break;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 85759b5..4fedce8 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -47,7 +47,7 @@ struct qdr_forwarder_t {
//==================================================================================
-qdr_delivery_t *qdr_forward_new_delivery(qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg)
+qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg)
{
qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -56,6 +56,7 @@ qdr_delivery_t *qdr_forward_new_delivery(qdr_delivery_t *peer, qdr_link_t *link,
dlv->peer = peer;
dlv->msg = qd_message_copy(msg);
dlv->settled = !peer || peer->settled;
+ dlv->tag = core->next_tag++;
if (peer->peer == 0)
peer->peer = dlv; // TODO - make this a back-list for multicast tracking
@@ -64,6 +65,24 @@ qdr_delivery_t *qdr_forward_new_delivery(qdr_delivery_t *peer, qdr_link_t *link,
}
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv)
+{
+ sys_mutex_lock(link->conn->work_lock);
+ DEQ_INSERT_TAIL(link->undelivered, dlv);
+ sys_mutex_unlock(link->conn->work_lock);
+
+ //
+ // If the link isn't already on the links_with_deliveries list, put it there.
+ //
+ qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+
+ //
+ // Activate the outgoing connection for later processing.
+ //
+ qdr_connection_activate_CT(core, link->conn);
+}
+
+
int qdr_forward_multicast_CT(qdr_core_t *core,
qdr_address_t *addr,
qd_message_t *msg,
@@ -80,9 +99,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
while (link_ref) {
qdr_link_t *out_link = link_ref->link;
- qdr_delivery_t *out_delivery = qdr_forward_new_delivery(in_delivery, out_link, msg);
- DEQ_INSERT_TAIL(out_link->undelivered, out_delivery); // TODO - check locking on this list
- qdr_connection_activate_CT(core, out_link->conn);
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
fanout++;
link_ref = DEQ_NEXT(link_ref);
}
@@ -148,8 +166,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
core->control_links_by_mask_bit[link_bit] :
core->data_links_by_mask_bit[link_bit];
if (dest_link) {
- qdr_delivery_t *out_delivery = qdr_forward_new_delivery(in_delivery, dest_link, msg);
- DEQ_INSERT_TAIL(dest_link->undelivered, out_delivery); // TODO - check locking on this list
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
+ qdr_forward_deliver_CT(core, dest_link, out_delivery);
fanout++;
addr->deliveries_transit++;
qdr_connection_activate_CT(core, dest_link->conn);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 359a192..479ba01 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -188,22 +188,25 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const cha
}
-void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link)
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
{
+ if (link->ref[cls] != 0)
+ return;
+
qdr_link_ref_t *ref = new_qdr_link_ref_t();
DEQ_ITEM_INIT(ref);
ref->link = link;
- link->ref = ref;
+ link->ref[cls] = ref;
DEQ_INSERT_TAIL(*ref_list, ref);
}
-void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link)
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
{
- if (link->ref) {
- DEQ_REMOVE(*ref_list, link->ref);
- free_qdr_link_ref_t(link->ref);
- link->ref = 0;
+ if (link->ref[cls]) {
+ DEQ_REMOVE(*ref_list, link->ref[cls]);
+ free_qdr_link_ref_t(link->ref[cls]);
+ link->ref[cls] = 0;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 8d80bf5..0ef3db4 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -173,11 +173,15 @@ struct qdr_delivery_t {
qd_field_iterator_t *origin;
uint64_t disposition;
bool settled;
+ uint64_t tag;
};
ALLOC_DECLARE(qdr_delivery_t);
DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
+#define QDR_LINK_LIST_CLASS_ADDRESS 0
+#define QDR_LINK_LIST_CLASS_DELIVERY 1
+#define QDR_LINK_LIST_CLASSES 2
struct qdr_link_t {
DEQ_LINKS(qdr_link_t);
@@ -189,7 +193,7 @@ struct qdr_link_t {
char *name;
qdr_address_t *owning_addr; ///< [ref] Address record that owns this link
qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link
- qdr_link_ref_t *ref; ///< Pointer to a containing reference object (TODO - check this!)
+ qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to containing reference objects
qdr_delivery_list_t undelivered; ///< Deliveries to be forwarded or sent
qdr_delivery_list_t unsettled; ///< Unsettled deliveries
bool strip_annotations_in;
@@ -266,8 +270,8 @@ DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t semantics);
qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_semantics_t semantics);
-void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
-void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
@@ -336,6 +340,7 @@ struct qdr_connection_t {
qdr_link_list_t links;
qdr_connection_work_list_t work_list;
sys_mutex_t *work_lock;
+ qdr_link_ref_list_t links_with_deliveries;
};
ALLOC_DECLARE(qdr_connection_t);
@@ -382,6 +387,10 @@ struct qdr_core_t {
qdr_link_second_attach_t second_attach_handler;
qdr_link_detach_t detach_handler;
qdr_link_flow_t flow_handler;
+ qdr_link_offer_t offer_handler;
+ qdr_link_drained_t drained_handler;
+ qdr_link_push_t push_handler;
+ qdr_link_deliver_t deliver_handler;
const char *router_area;
const char *router_id;
@@ -400,6 +409,8 @@ struct qdr_core_t {
qdr_link_t **control_links_by_mask_bit;
qdr_link_t **data_links_by_mask_bit;
+ uint64_t next_tag;
+
qdr_forwarder_t *forwarders[QD_SEMANTICS_LINK_BALANCED + 1];
};
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 1c56f13..c459628 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -75,10 +75,65 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg)
{
// TODO - Implement this. Bypass the CT?
+
+ //
+ // We might wish to run link-routed transfers and updates through the core in order to
+ // track the number of outstanding deliveries and to have the ability to intervene in
+ // flow control.
+ //
+ // Use case: Quiescing a broker. To do this, all inbound links to the broker shall be
+ // idled by preventing the propagation of flow credit out of the broker. This will dry
+ // the transfer of inbound deliveries, allow all existing deliveries to be settled, and
+ // allow the router to know when it is safe to detach the inbound links. Outbound links
+ // can also be detached after all deliveries are settled and "drained" indications are
+ // received.
+ //
+ // Waypoint disconnect procedure:
+ // 1) Block flow-credit propagation for link outbound to waypoint.
+ // 2) Wait for the number of unsettled outbound deliveries to go to zero.
+ // 3) Detach the outbound link.
+ // 4) Wait for inbound link to be drained with zero unsettled deliveries.
+ // 5) Detach inbound link.
+ //
+
return 0;
}
+void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
+{
+ qdr_connection_t *conn = link->conn;
+ qdr_delivery_t *dlv;
+ bool drained = false;
+ int offer;
+
+ while (credit > 0 && !drained) {
+ sys_mutex_lock(conn->work_lock);
+ dlv = DEQ_HEAD(link->undelivered);
+ if (dlv) {
+ DEQ_REMOVE_HEAD(link->undelivered);
+ DEQ_INSERT_TAIL(link->unsettled, dlv);
+ credit--;
+ offer = DEQ_SIZE(link->undelivered);
+ } else
+ drained = true;
+ sys_mutex_unlock(conn->work_lock);
+
+ if (dlv)
+ core->deliver_handler(core->user_context, link, dlv);
+ }
+
+ if (drained)
+ core->drained_handler(core->user_context, link);
+ else
+ core->offer_handler(core->user_context, link, offer);
+
+ //
+ // TODO - handle disposition/settlement updates
+ //
+}
+
+
void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_field_iterator_t *addr, bool exclude_inprocess, bool control)
{
qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
@@ -127,6 +182,19 @@ bool qdr_delivery_is_settled(const qdr_delivery_t *delivery)
}
+void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length)
+{
+ *tag = (const char*) &delivery->tag;
+ *length = sizeof(uint64_t);
+}
+
+
+qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
+{
+ return delivery->msg;
+}
+
+
//==================================================================================
// In-Thread Functions
//==================================================================================
@@ -140,6 +208,12 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
qdr_link_t *link = dlv->link;
int count = 0;
+ //
+ // NOTE: The link->undelivered list does not need to be protected by the
+ // connection's work lock for incoming links. This protection is only
+ // needed for outgoing links.
+ //
+
if (DEQ_IS_EMPTY(link->undelivered)) {
qdr_address_t *addr = link->owning_addr;
if (!addr && dlv->to_addr) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 1c7b0dd..c6c3403 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -622,6 +622,47 @@ static void qd_router_link_flow(void *context, qdr_link_t *link)
}
+static void qd_router_link_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+}
+
+
+static void qd_router_link_drained(void *context, qdr_link_t *link)
+{
+}
+
+
+static void qd_router_link_push(void *context, qdr_link_t *link)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+ pn_link_t *plink = qd_link_pn(qlink);
+ int link_credit = pn_link_credit(plink);
+
+ qdr_link_process_deliveries(router->router_core, link, link_credit);
+}
+
+
+static void qd_router_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv)
+{
+ qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+ pn_link_t *plink = qd_link_pn(qlink);
+ const char *tag;
+ int tag_length;
+
+ qdr_delivery_tag(dlv, &tag, &tag_length);
+
+ pn_delivery(plink, pn_dtag(tag, tag_length));
+ pn_delivery_t *pdlv = pn_link_current(plink);
+
+ pn_delivery_set_context(pdlv, dlv);
+ qdr_delivery_set_context(dlv, pdlv);
+
+ qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link));
+ pn_link_advance(plink);
+}
+
+
void qd_router_setup_late(qd_dispatch_t *qd)
{
qd->router->router_core = qdr_core(qd, qd->router->router_area, qd->router->router_id);
@@ -631,7 +672,11 @@ void qd_router_setup_late(qd_dispatch_t *qd)
qd_router_link_first_attach,
qd_router_link_second_attach,
qd_router_link_detach,
- qd_router_link_flow);
+ qd_router_link_flow,
+ qd_router_link_offer,
+ qd_router_link_drained,
+ qd_router_link_push,
+ qd_router_link_deliver);
qd_router_python_setup(qd->router);
qd_timer_schedule(qd->router->timer, 1000);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-dispatch git commit: DISPATCH-179 - Added delivery and
flow linkage between core and node.
Posted by tr...@apache.org.
DISPATCH-179 - Added delivery and flow linkage between core and node.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/88842e3b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/88842e3b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/88842e3b
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 88842e3b774d75206086687aac7c5e7785653a24
Parents: f0bfea6
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Jan 14 12:42:17 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Jan 14 12:42:17 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 2 +-
src/router_core/connections.c | 34 +++++++++++++++++++++++++-----
src/router_core/forwarder.c | 26 +++++++++++++++++++++--
src/router_core/router_core_private.h | 17 +++++++++++----
src/router_core/transfer.c | 28 ++++++++++++++++++++++++
src/router_node.c | 11 ++++++++--
tests/config-2/A.conf | 5 +++++
7 files changed, 109 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index b0119eb..9cd97e2 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -489,7 +489,7 @@ typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn,
typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error);
-typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link);
+typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link, int credit);
typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count);
typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
typedef void (*qdr_link_push_t) (void *context, qdr_link_t *link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 6adaa7e..762570d 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -114,11 +114,14 @@ void *qdr_connection_get_context(const qdr_connection_t *conn)
int qdr_connection_process(qdr_connection_t *conn)
{
qdr_connection_work_list_t work_list;
+ qdr_link_ref_list_t links_with_deliveries;
+ qdr_link_ref_list_t links_with_credit;
qdr_core_t *core = conn->core;
sys_mutex_lock(conn->work_lock);
DEQ_MOVE(conn->work_list, work_list);
- // TODO - Grab the list of links with deliveries
+ DEQ_MOVE(conn->links_with_deliveries, links_with_deliveries);
+ DEQ_MOVE(conn->links_with_credit, links_with_credit);
sys_mutex_unlock(conn->work_lock);
int event_count = DEQ_SIZE(work_list);
@@ -147,7 +150,20 @@ int qdr_connection_process(qdr_connection_t *conn)
work = DEQ_HEAD(work_list);
}
- // TODO - Invoke the push handler for each link with deliveries
+ qdr_link_ref_t *ref = DEQ_HEAD(links_with_deliveries);
+ while (ref) {
+ core->push_handler(core->user_context, ref->link);
+ qdr_del_link_ref(&links_with_deliveries, ref->link, QDR_LINK_LIST_CLASS_DELIVERY);
+ ref = DEQ_HEAD(links_with_deliveries);
+ }
+
+ ref = DEQ_HEAD(links_with_credit);
+ while (ref) {
+ core->flow_handler(core->user_context, ref->link, ref->link->incremental_credit);
+ ref->link->incremental_credit = 0;
+ qdr_del_link_ref(&links_with_credit, ref->link, QDR_LINK_LIST_CLASS_FLOW);
+ ref = DEQ_HEAD(links_with_credit);
+ }
return event_count;
}
@@ -223,6 +239,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
link->conn = conn;
link->name = (char*) malloc(strlen(name));
strcpy(link->name, name);
+ link->link_direction = dir;
+ link->capacity = 32; // TODO - make this configurable
link->strip_annotations_in = conn->strip_annotations_in;
link->strip_annotations_out = conn->strip_annotations_out;
@@ -258,6 +276,7 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error)
{
qdr_action_t *action = qdr_action(qdr_link_inbound_detach_CT, "link_detach");
+ action->args.connection.conn = link->conn;
action->args.connection.link = link;
action->args.connection.error = error;
action->args.connection.dt = dt;
@@ -374,6 +393,7 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->conn = conn;
link->link_type = link_type;
link->link_direction = dir;
+ link->capacity = 32; // TODO - make this configurable
link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
@@ -682,6 +702,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
if (qdr_terminus_is_anonymous(target)) {
link->owning_addr = 0;
qdr_link_outbound_second_attach_CT(core, link, source, target);
+ qdr_link_issue_credit_CT(core, link, link->capacity);
} else {
//
@@ -712,6 +733,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
link->owning_addr = addr;
qdr_add_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
qdr_link_outbound_second_attach_CT(core, link, source, target);
+ qdr_link_issue_credit_CT(core, link, link->capacity);
}
}
break;
@@ -723,10 +745,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
case QD_LINK_CONTROL:
qdr_link_outbound_second_attach_CT(core, link, source, target);
+ qdr_link_issue_credit_CT(core, link, link->capacity);
break;
case QD_LINK_ROUTER:
qdr_link_outbound_second_attach_CT(core, link, source, target);
+ qdr_link_issue_credit_CT(core, link, link->capacity);
break;
}
} else {
@@ -811,16 +835,16 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
if (discard)
return;
- qdr_connection_t *conn = action->args.connection.conn;
qdr_link_t *link = action->args.connection.link;
- qd_direction_t dir = action->args.connection.dir;
+ qdr_connection_t *conn = link->conn;
qdr_terminus_t *source = action->args.connection.source;
qdr_terminus_t *target = action->args.connection.target;
- if (dir == QD_INCOMING) {
+ if (link->link_direction == QD_INCOMING) {
//
// Handle incoming link cases
//
+ qdr_link_issue_credit_CT(core, link, link->capacity);
switch (link->link_type) {
case QD_LINK_ENDPOINT:
break;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 4fedce8..92c3074 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -58,7 +58,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *pe
dlv->settled = !peer || peer->settled;
dlv->tag = core->next_tag++;
- if (peer->peer == 0)
+ if (peer && peer->peer == 0)
peer->peer = dlv; // TODO - make this a back-list for multicast tracking
return dlv;
@@ -69,12 +69,12 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
{
sys_mutex_lock(link->conn->work_lock);
DEQ_INSERT_TAIL(link->undelivered, dlv);
- sys_mutex_unlock(link->conn->work_lock);
//
// If the link isn't already on the links_with_deliveries list, put it there.
//
qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+ sys_mutex_unlock(link->conn->work_lock);
//
// Activate the outgoing connection for later processing.
@@ -83,6 +83,23 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
}
+void qdr_forward_on_message(qdr_core_t *core, qdr_general_work_t *work)
+{
+ work->on_message(work->on_message_context, work->msg, work->maskbit);
+}
+
+
+void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg)
+{
+ qdr_general_work_t *work = qdr_general_work(qdr_forward_on_message);
+ work->on_message = sub->on_message;
+ work->on_message_context = sub->on_message_context;
+ work->msg = qd_message_copy(msg);
+ work->maskbit = link ? link->conn->mask_bit : 0;
+ qdr_post_general_work_CT(core, work);
+}
+
+
int qdr_forward_multicast_CT(qdr_core_t *core,
qdr_address_t *addr,
qd_message_t *msg,
@@ -181,6 +198,11 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
//
// Forward to in-process subscribers
//
+ qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
+ while (sub) {
+ qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg);
+ sub = DEQ_NEXT(sub);
+ }
}
return fanout;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0ef3db4..be006b8 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -181,7 +181,8 @@ DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
#define QDR_LINK_LIST_CLASS_ADDRESS 0
#define QDR_LINK_LIST_CLASS_DELIVERY 1
-#define QDR_LINK_LIST_CLASSES 2
+#define QDR_LINK_LIST_CLASS_FLOW 2
+#define QDR_LINK_LIST_CLASSES 3
struct qdr_link_t {
DEQ_LINKS(qdr_link_t);
@@ -198,6 +199,9 @@ struct qdr_link_t {
qdr_delivery_list_t unsettled; ///< Unsettled deliveries
bool strip_annotations_in;
bool strip_annotations_out;
+ int capacity;
+ int incremental_credit_CT;
+ int incremental_credit;
};
ALLOC_DECLARE(qdr_link_t);
@@ -211,6 +215,9 @@ struct qdr_link_ref_t {
ALLOC_DECLARE(qdr_link_ref_t);
DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
+
struct qdr_lrp_t {
DEQ_LINKS(qdr_lrp_t);
@@ -270,9 +277,6 @@ DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t semantics);
qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_semantics_t semantics);
-void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
-void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
-
void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
@@ -293,6 +297,9 @@ struct qdr_general_work_t {
qdr_general_work_handler_t handler;
qdr_field_t *field;
int maskbit;
+ qdr_receive_t on_message;
+ void *on_message_context;
+ qd_message_t *msg;
};
ALLOC_DECLARE(qdr_general_work_t);
@@ -341,6 +348,7 @@ struct qdr_connection_t {
qdr_connection_work_list_t work_list;
sys_mutex_t *work_lock;
qdr_link_ref_list_t links_with_deliveries;
+ qdr_link_ref_list_t links_with_credit;
};
ALLOC_DECLARE(qdr_connection_t);
@@ -426,6 +434,7 @@ void qdr_agent_setup_CT(qdr_core_t *core);
void qdr_forwarder_setup_CT(qdr_core_t *core);
qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label);
void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index c459628..a144adb 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -199,6 +199,34 @@ qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
// In-Thread Functions
//==================================================================================
+/**
+ * Check the link's accumulated credit. If the credit given to the connection thread
+ * has been issued to Proton, provide the next batch of credit to the connection thread.
+ */
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
+{
+ link->incremental_credit_CT += credit;
+
+ if (link->incremental_credit_CT && link->incremental_credit == 0) {
+ //
+ // Move the credit from the core-thread value to the connection-thread value.
+ //
+ link->incremental_credit = link->incremental_credit_CT;
+ link->incremental_credit_CT = 0;
+
+ //
+ // Put this link on the connection's has-credit list.
+ //
+ qdr_add_link_ref(&link->conn->links_with_credit, link, QDR_LINK_LIST_CLASS_FLOW);
+
+ //
+ // Activate the connection
+ //
+ qdr_connection_activate_CT(core, link->conn);
+ }
+}
+
+
static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (discard)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c6c3403..c6ecb7a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -285,8 +285,10 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
if (!addr_iter)
addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
- if (addr_iter)
+ if (addr_iter) {
+ qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd));
+ }
} else
delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd));
@@ -617,8 +619,12 @@ static void qd_router_link_detach(void *context, qdr_link_t *link, qdr_error_t *
}
-static void qd_router_link_flow(void *context, qdr_link_t *link)
+static void qd_router_link_flow(void *context, qdr_link_t *link, int credit)
{
+ qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+ pn_link_t *plink = qd_link_pn(qlink);
+
+ pn_link_flow(plink, credit);
}
@@ -634,6 +640,7 @@ static void qd_router_link_drained(void *context, qdr_link_t *link)
static void qd_router_link_push(void *context, qdr_link_t *link)
{
+ printf("qd_router_link_push\n");
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = qd_link_pn(qlink);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88842e3b/tests/config-2/A.conf
----------------------------------------------------------------------
diff --git a/tests/config-2/A.conf b/tests/config-2/A.conf
index a5b948d..4a5ccc9 100644
--- a/tests/config-2/A.conf
+++ b/tests/config-2/A.conf
@@ -91,3 +91,8 @@ log {
enable: trace+
}
+log {
+ module: ROUTER_HELLO
+ enable: trace+
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-dispatch git commit: DISPATCH-179 - Presettled Multicast
messages now flowing correctly. Routers discover each other.
Posted by tr...@apache.org.
DISPATCH-179 - Presettled Multicast messages now flowing correctly.
Routers discover each other.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6d9dc9e9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6d9dc9e9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6d9dc9e9
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 6d9dc9e9d1bb90f3e189e4e8fde053dee4b7299b
Parents: 88842e3
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jan 15 13:53:55 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Jan 15 13:53:55 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/message.h | 2 +-
src/message.c | 4 ++--
src/python_embedded.c | 25 ++++++++++++-------------
src/router_core/connections.c | 3 +++
src/router_core/forwarder.c | 18 +++++++++++-------
src/router_core/router_core.c | 1 +
src/router_core/router_core_private.h | 1 +
src/router_core/transfer.c | 6 ++++--
src/router_node.c | 16 ++++++++++------
tests/config-2/B.conf | 12 +++++++++++-
10 files changed, 56 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 9f9f0a9..7ab7b9c 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -167,7 +167,7 @@ void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *tra
* method must not reference it after this call.
*
* @param msg Pointer to an outgoing message.
- * @param to_field Pointer to a composed field representing the to overrid
+ * @param to_field Pointer to a composed field representing the to override
* address that will be used as the value for the QD_MA_TO map entry. If null,
* the message will not have a QA_MA_TO message annotation field. Ownership of
* this field is transferred to the message.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 6c375a7..7c7bdb2 100644
--- a/src/message.c
+++ b/src/message.c
@@ -793,8 +793,8 @@ static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t
}
void qd_message_send(qd_message_t *in_msg,
- qd_link_t *link,
- bool strip_annotations)
+ qd_link_t *link,
+ bool strip_annotations)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/src/python_embedded.c
----------------------------------------------------------------------
diff --git a/src/python_embedded.c b/src/python_embedded.c
index e5b88f8..669288a 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -557,19 +557,6 @@ static void IoAdapter_dealloc(IoAdapter* self)
static qd_error_t compose_python_message(qd_composed_field_t **field, PyObject *message,
qd_dispatch_t* qd) {
- *field = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, *field);
- qd_compose_start_map(*field);
-
- qd_compose_insert_symbol(*field, QD_MA_INGRESS);
- qd_compose_insert_string(*field, qd_router_id(qd));
-
- qd_compose_insert_symbol(*field, QD_MA_TRACE);
- qd_compose_start_list(*field);
- qd_compose_insert_string(*field, qd_router_id(qd));
- qd_compose_end_list(*field);
-
- qd_compose_end_map(*field);
-
*field = qd_compose(QD_PERFORMATIVE_PROPERTIES, *field);
qd_compose_start_list(*field);
qd_compose_insert_null(*field); // message-id
@@ -603,6 +590,18 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args)
if (compose_python_message(&field, message, ioa->qd) == QD_ERROR_NONE) {
qd_message_t *msg = qd_message();
qd_message_compose_2(msg, field);
+
+ qd_composed_field_t *ingress = qd_compose_subfield(0);
+ qd_compose_insert_string(ingress, qd_router_id(ioa->qd));
+
+ qd_composed_field_t *trace = qd_compose_subfield(0);
+ qd_compose_start_list(trace);
+ qd_compose_insert_string(trace, qd_router_id(ioa->qd));
+ qd_compose_end_list(trace);
+
+ qd_message_set_ingress_annotation(msg, ingress);
+ qd_message_set_trace_annotation(msg, trace);
+
PyObject *address = PyObject_GetAttrString(message, "address");
if (address) {
qdr_send_to2(ioa->core, msg, PyString_AsString(address), (bool) no_echo, (bool) control);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 762570d..0833214 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -397,6 +397,9 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
+ link->strip_annotations_in = conn->strip_annotations_in;
+ link->strip_annotations_out = conn->strip_annotations_out;
+
qdr_connection_work_t *work = new_qdr_connection_work_t();
ZERO(work);
work->work_type = QDR_CONNECTION_WORK_FIRST_ATTACH;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 92c3074..5ba86fd 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -113,13 +113,15 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
//
// Forward to local subscribers
//
- qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
- while (link_ref) {
- qdr_link_t *out_link = link_ref->link;
- qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery);
- fanout++;
- link_ref = DEQ_NEXT(link_ref);
+ if (!addr->local || exclude_inprocess) {
+ qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+ while (link_ref) {
+ qdr_link_t *out_link = link_ref->link;
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
+ fanout++;
+ link_ref = DEQ_NEXT(link_ref);
+ }
}
//
@@ -201,10 +203,12 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
while (sub) {
qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg);
+ fanout++;
sub = DEQ_NEXT(sub);
}
}
+
return fanout;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 479ba01..bf8be0e 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -182,6 +182,7 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const cha
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(core->addrs, addr);
addr->block_deletion = true;
+ addr->local = (aclass == 'L');
}
qd_field_iterator_free(iter);
return addr;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index be006b8..9b0194f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -260,6 +260,7 @@ struct qdr_address_t {
bool toggle;
bool waypoint;
bool block_deletion;
+ bool local;
/**@name Statistics */
///@{
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index a144adb..0dec0cd 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -252,20 +252,22 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
}
if (count == 0) {
- if (link->owning_addr)
+ if (link->owning_addr) {
//
// Message was not delivered and the link is not anonymous.
// Queue the message for later delivery (when the address gets
// a valid destination).
//
DEQ_INSERT_TAIL(link->undelivered, dlv);
- else {
+ } else {
//
// TODO - Release the delivery
//
}
} else if (count == 1) {
if (qdr_delivery_is_settled(dlv))
+ qdr_link_issue_credit_CT(core, link, 1);
+ else
DEQ_INSERT_TAIL(link->unsettled, dlv);
} else {
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c6ecb7a..8e2dfab 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -75,17 +75,20 @@ static void qd_router_connection_get_config(const qd_connection_t *conn,
{
if (conn) {
const qd_server_config_t *cf = qd_connection_config(conn);
- if (cf && strcmp(cf->role, router_role) == 0)
+
+ *strip_annotations_in = cf->strip_inbound_annotations;
+ *strip_annotations_out = cf->strip_outbound_annotations;
+
+ if (cf && strcmp(cf->role, router_role) == 0) {
+ *strip_annotations_in = false;
+ *strip_annotations_out = false;
*role = QDR_ROLE_INTER_ROUTER;
- else if (cf && strcmp(cf->role, on_demand_role) == 0)
+ } else if (cf && strcmp(cf->role, on_demand_role) == 0)
*role = QDR_ROLE_ON_DEMAND;
else
*role = QDR_ROLE_NORMAL;
*label = cf->label;
-
- *strip_annotations_in = cf->strip_inbound_annotations;
- *strip_annotations_out = cf->strip_outbound_annotations;
}
}
@@ -640,7 +643,6 @@ static void qd_router_link_drained(void *context, qdr_link_t *link)
static void qd_router_link_push(void *context, qdr_link_t *link)
{
- printf("qd_router_link_push\n");
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = qd_link_pn(qlink);
@@ -666,6 +668,8 @@ static void qd_router_link_deliver(void *context, qdr_link_t *link, qdr_delivery
qdr_delivery_set_context(dlv, pdlv);
qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link));
+ if (qdr_delivery_is_settled(dlv))
+ pn_delivery_settle(pdlv);
pn_link_advance(plink);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6d9dc9e9/tests/config-2/B.conf
----------------------------------------------------------------------
diff --git a/tests/config-2/B.conf b/tests/config-2/B.conf
index fb0ca78..4d12c62 100644
--- a/tests/config-2/B.conf
+++ b/tests/config-2/B.conf
@@ -87,7 +87,17 @@ fixedAddress {
}
log {
- module: ROUTER_CORE
+ module: ROUTER_LS
+ enable: trace+
+}
+
+log {
+ module: ROUTER_MA
+ enable: trace+
+}
+
+log {
+ module: ROUTER_HELLO
enable: trace+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org