You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/10/11 21:21:31 UTC
qpid-dispatch git commit: DISPATCH-1110 - Added code to synchronously
call AMQP-rx_handler to pull in all data from the proton buffers at once.
Also introduced link level flag to continue receiving without boundaries upon
link detach arrival
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 4a232b674 -> 1ff669e77
DISPATCH-1110 - Added code to synchronously call AMQP-rx_handler to pull in all data from the proton buffers at once. Also introduced link level flag to continue receiving without boundaries upon link detach arrival
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1ff669e7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1ff669e7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1ff669e7
Branch: refs/heads/master
Commit: 1ff669e77c13a515de084281067e8dbe10cdb571
Parents: 4a232b6
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Wed Oct 10 13:29:41 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Oct 11 17:15:07 2018 -0400
----------------------------------------------------------------------
include/qpid/dispatch/container.h | 4 +++-
src/container.c | 18 ++++++++++++++++-
src/message.c | 14 +++++++++-----
src/router_node.c | 35 +++++++++++++++++++++-------------
4 files changed, 51 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ff669e7/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index aa2d12b..047ac6e 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -71,7 +71,7 @@ typedef enum {
typedef struct qd_node_t qd_node_t;
typedef struct qd_link_t qd_link_t;
-typedef void (*qd_container_delivery_handler_t) (void *node_context, qd_link_t *link);
+typedef bool (*qd_container_delivery_handler_t) (void *node_context, qd_link_t *link);
typedef void (*qd_container_disposition_handler_t) (void *node_context, qd_link_t *link, pn_delivery_t *pnd);
typedef int (*qd_container_link_handler_t) (void *node_context, qd_link_t *link);
typedef int (*qd_container_link_detach_handler_t) (void *node_context, qd_link_t *link, qd_detach_type_t dt);
@@ -179,6 +179,8 @@ void *qd_link_get_context(qd_link_t *link);
void policy_notify_opened(void *container, qd_connection_t *conn, void *context);
qd_direction_t qd_link_direction(const qd_link_t *link);
+bool qd_link_is_q2_limit_unbounded(qd_link_t *link);
+void qd_link_set_q2_limit_unbounded(qd_link_t *link, bool q2_limit_unbounded);
pn_snd_settle_mode_t qd_link_remote_snd_settle_mode(const qd_link_t *link);
qd_connection_t *qd_link_connection(qd_link_t *link);
pn_link_t *qd_link_pn(qd_link_t *link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ff669e7/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index c23f951..d0e3e5e 100644
--- a/src/container.c
+++ b/src/container.c
@@ -61,6 +61,7 @@ struct qd_link_t {
bool drain_mode;
pn_snd_settle_mode_t remote_snd_settle_mode;
qd_link_ref_list_t ref_list;
+ bool q2_limit_unbounded;
};
DEQ_DECLARE(qd_link_t, qd_link_list_t);
@@ -185,7 +186,10 @@ static void do_receive(pn_delivery_t *pnd)
if (link) {
qd_node_t *node = link->node;
if (node) {
- node->ntype->rx_handler(node->context, link);
+ while (true) {
+ if (!node->ntype->rx_handler(node->context, link))
+ break;
+ }
return;
}
}
@@ -903,6 +907,18 @@ pn_session_t *qd_link_pn_session(qd_link_t *link)
}
+bool qd_link_is_q2_limit_unbounded(qd_link_t *link)
+{
+ return link->q2_limit_unbounded;
+}
+
+
+void qd_link_set_q2_limit_unbounded(qd_link_t *link, bool q2_limit_unbounded)
+{
+ link->q2_limit_unbounded = q2_limit_unbounded;
+}
+
+
qd_direction_t qd_link_direction(const qd_link_t *link)
{
return link->direction;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ff669e7/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 5c836ab..a3682c1 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1182,6 +1182,7 @@ qd_message_t * qd_get_message_context(pn_delivery_t *delivery)
qd_message_t *qd_message_receive(pn_delivery_t *delivery)
{
pn_link_t *link = pn_delivery_link(delivery);
+ qd_link_t *qdl = (qd_link_t *)pn_link_get_context(link);
ssize_t rc;
pn_record_t *record = pn_delivery_attachments(delivery);
@@ -1216,9 +1217,10 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
// have been processed and freed by outbound processing then
// message holdoff is cleared and receiving may continue.
//
- if (!msg->content->disable_q2_holdoff) {
- if (msg->content->q2_input_holdoff)
+ if (!qd_link_is_q2_limit_unbounded(qdl) && !msg->content->disable_q2_holdoff) {
+ if (msg->content->q2_input_holdoff) {
return (qd_message_t*)msg;
+ }
}
// Loop until msg is complete, error seen, or incoming bytes are consumed
@@ -1275,9 +1277,11 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
DEQ_INSERT_TAIL(msg->content->buffers, msg->content->pending);
msg->content->pending = 0;
if (qd_message_Q2_holdoff_should_block((qd_message_t *)msg)) {
- msg->content->q2_input_holdoff = true;
- UNLOCK(msg->content->lock);
- break;
+ if (!qd_link_is_q2_limit_unbounded(qdl)) {
+ msg->content->q2_input_holdoff = true;
+ UNLOCK(msg->content->lock);
+ break;
+ }
}
UNLOCK(msg->content->lock);
msg->content->pending = qd_buffer();
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ff669e7/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index cc704c1..d19afb9 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -305,18 +305,19 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa
/**
* Inbound Delivery Handler
*/
-static void AMQP_rx_handler(void* context, qd_link_t *link)
+static bool AMQP_rx_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pn_link = qd_link_pn(link);
+ bool next_delivery = false;
assert(pn_link);
if (!pn_link)
- return;
+ return next_delivery;
pn_delivery_t *pnd = pn_link_current(pn_link);
if (!pnd)
- return;
+ return next_delivery;
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
qd_connection_t *conn = qd_link_connection(link);
qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
@@ -349,11 +350,11 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
// If there's another delivery pending then reschedule this.
pn_delivery_t *npnd = pn_link_current(pn_link);
if (npnd) {
- qd_connection_invoke_deferred(conn, deferred_AMQP_rx_handler, link);
+ next_delivery = true;
}
if (qd_message_is_discard(msg)) {
- return;
+ return next_delivery;
}
}
@@ -364,7 +365,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
if (!rlink) {
if (receive_complete) // The entire message has been received but there is nowhere to send it to, free it and do nothing.
qd_message_free(msg);
- return;
+ return next_delivery;
}
//
@@ -405,7 +406,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link");
}
- return;
+ return next_delivery;
}
//
@@ -458,7 +459,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
}
if (!valid_message) {
- return;
+ return next_delivery;
}
if (delivery) {
@@ -476,7 +477,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
}
}
- return;
+ return next_delivery;
}
if (check_user) {
@@ -494,7 +495,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
pn_delivery_settle(pnd);
qd_message_free(msg);
qd_iterator_free(userid_iter);
- return;
+ return next_delivery;
}
}
qd_iterator_free(userid_iter);
@@ -520,7 +521,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
pn_delivery_update(pnd, PN_RELEASED);
pn_delivery_settle(pnd);
qd_message_free(msg);
- return;
+ return next_delivery;
}
if (anonymous_link) {
@@ -596,6 +597,8 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index);
}
+
+
if (delivery) {
//
// Settle the proton delivery only if all the data has arrived
@@ -604,7 +607,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
if (receive_complete) {
pn_delivery_settle(pnd);
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver");
- return;
+ return next_delivery;
}
}
@@ -620,6 +623,8 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
pn_delivery_settle(pnd);
qd_message_free(msg);
}
+
+ return next_delivery;
}
@@ -631,7 +636,10 @@ static void deferred_AMQP_rx_handler(void *context, bool discard) {
qd_link_t *qdl = (qd_link_t*)context;
qd_router_t *qdr = (qd_router_t *)qd_link_get_node_context(qdl);
assert(qdr != 0);
- AMQP_rx_handler(qdr, qdl);
+ while (true) {
+ if (! AMQP_rx_handler(qdr, qdl))
+ break;
+ }
}
}
@@ -781,6 +789,7 @@ static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_ty
qd_message_t *msg = qd_get_message_context(pnd);
if (msg) {
if (!qd_message_receive_complete(msg)) {
+ qd_link_set_q2_limit_unbounded(link, true);
qd_message_Q2_holdoff_disable(msg);
deferred_AMQP_rx_handler((void *)link, false);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org