You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/09/24 19:10:01 UTC

qpid-dispatch git commit: DISPATCH-1129 - Moved call to forwarding multicast messages to end of forwarder function. Also modified qd_message_check_LH to not validate the message if bufferes have already been freed

Repository: qpid-dispatch
Updated Branches:
  refs/heads/AARON-PROBLEM [created] 00edc49fe


DISPATCH-1129 - Moved call to forwarding multicast messages to end of forwarder function. Also modified qd_message_check_LH to not validate the message if bufferes have already been freed


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/00edc49f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/00edc49f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/00edc49f

Branch: refs/heads/AARON-PROBLEM
Commit: 00edc49fec82aaabcf4d3bfc65e24ed0b73d1d62
Parents: 12692a7
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Sep 24 13:06:47 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Mon Sep 24 15:01:28 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/message.h       | 22 +++++++++++++++
 src/message.c                         | 44 ++++++++++++++++++++++++++++--
 src/message_private.h                 |  1 +
 src/router_core/forwarder.c           | 31 +++++++++++++++++++--
 src/router_core/router_core_private.h | 11 ++++++++
 src/router_node.c                     | 12 ++++----
 6 files changed, 110 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index ec0b901..915094a 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -221,6 +221,13 @@ void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *i
 qd_message_t *qd_message_receive(pn_delivery_t *delivery);
 
 /**
+ * Returns the PN_DELIVERY_CTX record from the attachments
+ *
+ * @param delivery An incoming delivery from a link
+ */
+qd_message_t * qd_pn_delivery_get_delivery_context(pn_delivery_t *delivery);
+
+/**
  * Send the message outbound on an outgoing link.
  *
  * @param msg A pointer to a message to be sent.
@@ -335,6 +342,21 @@ void qd_message_set_discard(qd_message_t *msg, bool discard);
 bool qd_message_receive_complete(qd_message_t *msg);
 
 /**
+ * Returns true if at least one message buffer has been freed, false otherwise.
+ * @param msg A pointer to the message.
+ */
+bool qd_message_is_buffers_freed(qd_message_t *in_msg);
+
+/**
+ *Set the buffers_freed field on the message to to the passed in boolean value.
+ *
+ * @param msg A pointer to the message.
+ * @param discard - the boolean value of buffers_freed.
+ */
+
+void qd_message_set_buffers_freed(qd_message_t *msg, bool buffers_freed);
+
+/**
  * Returns true if the message has been completely received AND the message has been completely sent.
  */
 bool qd_message_send_complete(qd_message_t *msg);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index b3c9cb8..eef8858 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1137,6 +1137,26 @@ bool qd_message_receive_complete(qd_message_t *in_msg)
     return msg->content->receive_complete;
 }
 
+
+bool qd_message_is_buffers_freed(qd_message_t *in_msg)
+{
+    if (!in_msg)
+        return false;
+    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    return msg->content->buffers_freed;
+}
+
+
+void qd_message_set_buffers_freed(qd_message_t *msg, bool buffers_freed)
+{
+    if (!msg)
+        return;
+
+    qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
+    pvt_msg->content->buffers_freed = buffers_freed;
+}
+
+
 bool qd_message_send_complete(qd_message_t *in_msg)
 {
     if (!in_msg)
@@ -1207,6 +1227,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
     return msg_in;
 }
 
+qd_message_t * qd_pn_delivery_get_delivery_context(pn_delivery_t *delivery)
+{
+    pn_record_t *record    = pn_delivery_attachments(delivery);
+    if (record)
+        return pn_record_get(record, PN_DELIVERY_CTX);
+
+    return 0;
+}
+
 
 qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 {
@@ -1214,7 +1243,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     ssize_t           rc;
 
     pn_record_t *record    = pn_delivery_attachments(delivery);
-    qd_message_pvt_t *msg  = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX);
+    qd_message_t *m = pn_record_get(record, PN_DELIVERY_CTX);
+    qd_message_pvt_t *msg  = (qd_message_pvt_t*) m;
 
     //
     // If there is no message associated with the delivery then this is the
@@ -1222,13 +1252,14 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     // Allocate a message descriptor and link it and the delivery together.
     //
     if (!msg) {
-        msg = (qd_message_pvt_t*) qd_message();
+        m = qd_message();
+        msg = (qd_message_pvt_t*) m;
         qd_link_t       *qdl = (qd_link_t *)pn_link_get_context(link);
         qd_connection_t *qdc = qd_link_connection(qdl);
         msg->content->input_link = pn_link_get_context(link);
         msg->strip_annotations_in  = qd_connection_strip_annotations_in(qdc);
         pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
-        pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
+        pn_record_set(record, PN_DELIVERY_CTX, (void*) m);
     }
 
     //
@@ -1632,6 +1663,9 @@ void qd_message_send(qd_message_t *in_msg,
                 while (local_buf && local_buf != next_buf) {
                     DEQ_REMOVE_HEAD(content->buffers);
                     qd_buffer_free(local_buf);
+                    if (!qd_message_is_buffers_freed(in_msg))
+                        qd_message_set_buffers_freed(in_msg, true);
+
                     local_buf = DEQ_HEAD(content->buffers);
 
                     // by freeing a buffer there now may be room to restart a
@@ -1712,6 +1746,10 @@ static int qd_check_field_LH(qd_message_content_t *content,
 static bool qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth)
 {
     qd_error_clear();
+
+    if (content->buffers_freed)
+        return true;
+
     qd_buffer_t *buffer  = DEQ_HEAD(content->buffers);
 
     if (!buffer) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 14d2593..c45573a 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -123,6 +123,7 @@ typedef struct {
     bool                 q2_input_holdoff;               // hold off calling pn_link_recv
     bool                 aborted;                        // receive completed with abort flag set
     bool                 disable_q2_holdoff;             // Disable the Q2 flow control
+    bool                 buffers_freed;                  // Have any message content buffers be freed ?
     uint8_t              priority;
 } qd_message_content_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 8ccb049..4286e59 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -23,6 +23,8 @@
 #include <strings.h>
 #include "forwarder.h"
 
+ALLOC_DEFINE(qdr_forward_deliver_info_t);
+
 
 static qdr_link_t * peer_data_link(qdr_core_t *core,
                                    qdr_node_t *node,
@@ -266,6 +268,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     bool          presettled           = !!in_delivery ? in_delivery->settled : true;
     bool          receive_complete     = qd_message_receive_complete(qdr_delivery_message(in_delivery));
     int           priority             = qd_message_get_priority(msg);
+    qdr_forward_deliver_info_list_t deliver_info_list;
+    DEQ_INIT(deliver_info_list);
 
     //
     // If the delivery is not presettled, set the settled flag for forwarding so all
@@ -285,7 +289,14 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         while (link_ref) {
             qdr_link_t     *out_link     = link_ref->link;
             qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-            qdr_forward_deliver_CT(core, out_link, out_delivery);
+
+            // Store the out_link and out_delivery so we can forward the delivery later on
+            qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+            ZERO(deliver_info);
+            deliver_info->out_dlv = out_delivery;
+            deliver_info->out_link = out_link;
+            DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
             fanout++;
             if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
                 addr->deliveries_egress++;
@@ -360,7 +371,14 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
                 core->data_links_by_mask_bit[link_bit].links[priority];
             if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
                 qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
-                qdr_forward_deliver_CT(core, dest_link, out_delivery);
+
+                // Store the out_link and out_delivery so we can forward the delivery later on
+                qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+                ZERO(deliver_info);
+                deliver_info->out_dlv = out_delivery;
+                deliver_info->out_link = dest_link;
+                DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
                 fanout++;
                 addr->deliveries_transit++;
                 if (dest_link->link_type == QD_LINK_ROUTER)
@@ -375,7 +393,6 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         //
         // Forward to in-process subscribers
         //
-
         qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
         while (sub) {
             //
@@ -399,6 +416,14 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         }
     }
 
+    qdr_forward_deliver_info_t *deliver_info = DEQ_HEAD(deliver_info_list);
+    while (deliver_info) {
+        qdr_forward_deliver_CT(core, deliver_info->out_link, deliver_info->out_dlv);
+        DEQ_REMOVE_HEAD(deliver_info_list);
+        free_qdr_forward_deliver_info_t(deliver_info);
+        deliver_info = DEQ_HEAD(deliver_info_list);
+    }
+
     if (in_delivery && !presettled) {
         if (fanout == 0)
             //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 13c7dc4..cb67b4f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -440,6 +440,17 @@ struct qdr_link_ref_t {
 ALLOC_DECLARE(qdr_link_ref_t);
 DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
 
+
+typedef struct qdr_forward_deliver_info_t {
+    DEQ_LINKS(struct qdr_forward_deliver_info_t);
+    qdr_link_t     *out_link;
+    qdr_delivery_t *out_dlv;
+} qdr_forward_deliver_info_t;
+
+ALLOC_DECLARE(qdr_forward_deliver_info_t);
+DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
+
+
 void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
 void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
 void move_link_ref(qdr_link_t *link, int from_cls, int to_cls);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 5f5efc6..f3f0c50 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -778,12 +778,14 @@ static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_ty
     pn_delivery_t  *pnd          = pn_link_current(pn_link);
 
     if (pnd) {
-        qd_message_t   *msg   = qd_message_receive(pnd);
-
-        if (!qd_message_receive_complete(msg)) {
-            qd_message_Q2_holdoff_disable(msg);
-            deferred_AMQP_rx_handler((void *)link, false);
+        qd_message_t *msg = qd_pn_delivery_get_delivery_context(pnd);
+        if (msg) {
+            if (!qd_message_receive_complete(msg)) {
+                qd_message_Q2_holdoff_disable(msg);
+                deferred_AMQP_rx_handler((void *)link, false);
+            }
         }
+
     }
 
     qd_router_t    *router = (qd_router_t*) context;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org