You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/10/11 21:21:31 UTC

qpid-dispatch git commit: DISPATCH-1110 - Added code to synchronously call AMQP-rx_handler to pull in all data from the proton buffers at once. Also introduced link level flag to continue receiving without boundaries upon link detach arrival

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 4a232b674 -> 1ff669e77


DISPATCH-1110 - Added code to synchronously call AMQP-rx_handler to pull in all data from the proton buffers at once. Also introduced link level flag to continue receiving without boundaries upon link detach arrival


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1ff669e7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1ff669e7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1ff669e7

Branch: refs/heads/master
Commit: 1ff669e77c13a515de084281067e8dbe10cdb571
Parents: 4a232b6
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Wed Oct 10 13:29:41 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Oct 11 17:15:07 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/container.h |  4 +++-
 src/container.c                   | 18 ++++++++++++++++-
 src/message.c                     | 14 +++++++++-----
 src/router_node.c                 | 35 +++++++++++++++++++++-------------
 4 files changed, 51 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ff669e7/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index aa2d12b..047ac6e 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -71,7 +71,7 @@ typedef enum {
 typedef struct qd_node_t     qd_node_t;
 typedef struct qd_link_t     qd_link_t;
 
-typedef void (*qd_container_delivery_handler_t)    (void *node_context, qd_link_t *link);
+typedef bool (*qd_container_delivery_handler_t)    (void *node_context, qd_link_t *link);
 typedef void (*qd_container_disposition_handler_t) (void *node_context, qd_link_t *link, pn_delivery_t *pnd);
 typedef int  (*qd_container_link_handler_t)        (void *node_context, qd_link_t *link);
 typedef int  (*qd_container_link_detach_handler_t) (void *node_context, qd_link_t *link, qd_detach_type_t dt);
@@ -179,6 +179,8 @@ void *qd_link_get_context(qd_link_t *link);
 
 void policy_notify_opened(void *container, qd_connection_t *conn, void *context);
 qd_direction_t qd_link_direction(const qd_link_t *link);
+bool qd_link_is_q2_limit_unbounded(qd_link_t *link);
+void qd_link_set_q2_limit_unbounded(qd_link_t *link, bool q2_limit_unbounded);
 pn_snd_settle_mode_t qd_link_remote_snd_settle_mode(const qd_link_t *link);
 qd_connection_t *qd_link_connection(qd_link_t *link);
 pn_link_t *qd_link_pn(qd_link_t *link);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ff669e7/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index c23f951..d0e3e5e 100644
--- a/src/container.c
+++ b/src/container.c
@@ -61,6 +61,7 @@ struct qd_link_t {
     bool                        drain_mode;
     pn_snd_settle_mode_t        remote_snd_settle_mode;
     qd_link_ref_list_t          ref_list;
+    bool                        q2_limit_unbounded;
 };
 
 DEQ_DECLARE(qd_link_t, qd_link_list_t);
@@ -185,7 +186,10 @@ static void do_receive(pn_delivery_t *pnd)
     if (link) {
         qd_node_t *node = link->node;
         if (node) {
-            node->ntype->rx_handler(node->context, link);
+            while (true) {
+                if (!node->ntype->rx_handler(node->context, link))
+                    break;
+            }
             return;
         }
     }
@@ -903,6 +907,18 @@ pn_session_t *qd_link_pn_session(qd_link_t *link)
 }
 
 
+bool qd_link_is_q2_limit_unbounded(qd_link_t *link)
+{
+    return link->q2_limit_unbounded;
+}
+
+
+void qd_link_set_q2_limit_unbounded(qd_link_t *link, bool q2_limit_unbounded)
+{
+    link->q2_limit_unbounded = q2_limit_unbounded;
+}
+
+
 qd_direction_t qd_link_direction(const qd_link_t *link)
 {
     return link->direction;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ff669e7/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 5c836ab..a3682c1 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1182,6 +1182,7 @@ qd_message_t * qd_get_message_context(pn_delivery_t *delivery)
 qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 {
     pn_link_t        *link = pn_delivery_link(delivery);
+    qd_link_t       *qdl = (qd_link_t *)pn_link_get_context(link);
     ssize_t           rc;
 
     pn_record_t *record    = pn_delivery_attachments(delivery);
@@ -1216,9 +1217,10 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     //      have been processed and freed by outbound processing then
     //      message holdoff is cleared and receiving may continue.
     //
-    if (!msg->content->disable_q2_holdoff) {
-        if (msg->content->q2_input_holdoff)
+    if (!qd_link_is_q2_limit_unbounded(qdl) && !msg->content->disable_q2_holdoff) {
+        if (msg->content->q2_input_holdoff) {
             return (qd_message_t*)msg;
+        }
     }
 
     // Loop until msg is complete, error seen, or incoming bytes are consumed
@@ -1275,9 +1277,11 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
                 DEQ_INSERT_TAIL(msg->content->buffers, msg->content->pending);
                 msg->content->pending = 0;
                 if (qd_message_Q2_holdoff_should_block((qd_message_t *)msg)) {
-                    msg->content->q2_input_holdoff = true;
-                    UNLOCK(msg->content->lock);
-                    break;
+                    if (!qd_link_is_q2_limit_unbounded(qdl)) {
+                        msg->content->q2_input_holdoff = true;
+                        UNLOCK(msg->content->lock);
+                        break;
+                    }
                 }
                 UNLOCK(msg->content->lock);
                 msg->content->pending = qd_buffer();

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ff669e7/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index cc704c1..d19afb9 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -305,18 +305,19 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa
 /**
  * Inbound Delivery Handler
  */
-static void AMQP_rx_handler(void* context, qd_link_t *link)
+static bool AMQP_rx_handler(void* context, qd_link_t *link)
 {
     qd_router_t    *router       = (qd_router_t*) context;
     pn_link_t      *pn_link      = qd_link_pn(link);
+    bool            next_delivery = false;
     assert(pn_link);
 
     if (!pn_link)
-        return;
+        return next_delivery;
 
     pn_delivery_t  *pnd          = pn_link_current(pn_link);
     if (!pnd)
-        return;
+        return next_delivery;
     qdr_link_t     *rlink        = (qdr_link_t*) qd_link_get_context(link);
     qd_connection_t  *conn       = qd_link_connection(link);
     qdr_delivery_t *delivery     = qdr_node_delivery_qdr_from_pn(pnd);
@@ -349,11 +350,11 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
         // If there's another delivery pending then reschedule this.
         pn_delivery_t *npnd = pn_link_current(pn_link);
         if (npnd) {
-            qd_connection_invoke_deferred(conn, deferred_AMQP_rx_handler, link);
+            next_delivery = true;
         }
 
         if (qd_message_is_discard(msg)) {
-            return;
+            return next_delivery;
         }
     }
 
@@ -364,7 +365,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
     if (!rlink) {
         if (receive_complete) // The entire message has been received but there is nowhere to send it to, free it and do nothing.
             qd_message_free(msg);
-        return;
+        return next_delivery;
     }
 
     //
@@ -405,7 +406,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
             qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link");
         }
 
-        return;
+        return next_delivery;
     }
 
     //
@@ -458,7 +459,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
     }
 
     if (!valid_message) {
-        return;
+        return next_delivery;
     }
 
     if (delivery) {
@@ -476,7 +477,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
             }
         }
 
-        return;
+        return next_delivery;
     }
 
     if (check_user) {
@@ -494,7 +495,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
                     pn_delivery_settle(pnd);
                     qd_message_free(msg);
                     qd_iterator_free(userid_iter);
-                    return;
+                    return next_delivery;
                 }
             }
             qd_iterator_free(userid_iter);
@@ -520,7 +521,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
         pn_delivery_update(pnd, PN_RELEASED);
         pn_delivery_settle(pnd);
         qd_message_free(msg);
-        return;
+        return next_delivery;
     }
 
     if (anonymous_link) {
@@ -596,6 +597,8 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
         delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index);
     }
 
+
+
     if (delivery) {
         //
         // Settle the proton delivery only if all the data has arrived
@@ -604,7 +607,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
             if (receive_complete) {
                 pn_delivery_settle(pnd);
                 qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver");
-                return;
+                return next_delivery;
             }
         }
 
@@ -620,6 +623,8 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
         pn_delivery_settle(pnd);
         qd_message_free(msg);
     }
+
+    return next_delivery;
 }
 
 
@@ -631,7 +636,10 @@ static void deferred_AMQP_rx_handler(void *context, bool discard) {
         qd_link_t     *qdl = (qd_link_t*)context;
         qd_router_t   *qdr = (qd_router_t *)qd_link_get_node_context(qdl);
         assert(qdr != 0);
-        AMQP_rx_handler(qdr, qdl);
+        while (true) {
+            if (! AMQP_rx_handler(qdr, qdl))
+                break;
+        }
     }
 }
 
@@ -781,6 +789,7 @@ static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_ty
         qd_message_t *msg = qd_get_message_context(pnd);
         if (msg) {
             if (!qd_message_receive_complete(msg)) {
+                qd_link_set_q2_limit_unbounded(link, true);
                 qd_message_Q2_holdoff_disable(msg);
                 deferred_AMQP_rx_handler((void *)link, false);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org