You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/09/25 21:11:18 UTC

qpid-dispatch git commit: DISPATCH-1129 - Moved call to forwarding multicast messages to end of forwarder function. Also modified qd_message_check_LH to not validate the message if bufferes have already been freed. This closes #383.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 15c46db55 -> fa03ea400


DISPATCH-1129 - Moved call to forwarding multicast messages to end of forwarder function. Also modified qd_message_check_LH to not validate the message if bufferes have already been freed. This closes #383.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fa03ea40
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fa03ea40
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fa03ea40

Branch: refs/heads/master
Commit: fa03ea4008524ca7eb5ba4c6409ec0328eca2e87
Parents: 15c46db
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Sep 24 13:06:47 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue Sep 25 17:05:04 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/message.h |  8 ++++++++
 src/message.c                   | 24 ++++++++++++++++++++--
 src/message_private.h           |  1 +
 src/router_core/forwarder.c     | 40 +++++++++++++++++++++++++++++++++---
 src/router_node.c               | 12 ++++++-----
 5 files changed, 75 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index ec0b901..27bc294 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -221,6 +221,14 @@ void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *i
 qd_message_t *qd_message_receive(pn_delivery_t *delivery);
 
 /**
+ * Returns the PN_DELIVERY_CTX record from the attachments
+ *
+ * @param delivery An incoming delivery from a link
+ * @return - pointer to qd_message_t object
+ */
+qd_message_t * qd_get_message_context(pn_delivery_t *delivery);
+
+/**
  * Send the message outbound on an outgoing link.
  *
  * @param msg A pointer to a message to be sent.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index b3c9cb8..77a49e5 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1137,6 +1137,7 @@ bool qd_message_receive_complete(qd_message_t *in_msg)
     return msg->content->receive_complete;
 }
 
+
 bool qd_message_send_complete(qd_message_t *in_msg)
 {
     if (!in_msg)
@@ -1207,6 +1208,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
     return msg_in;
 }
 
+qd_message_t * qd_get_message_context(pn_delivery_t *delivery)
+{
+    pn_record_t *record    = pn_delivery_attachments(delivery);
+    if (record)
+        return (qd_message_t *) pn_record_get(record, PN_DELIVERY_CTX);
+
+    return 0;
+}
+
 
 qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 {
@@ -1229,7 +1239,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         msg->strip_annotations_in  = qd_connection_strip_annotations_in(qdc);
         pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
         pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
-    }
+}
 
     //
     // The discard flag indicates we should keep reading the input stream
@@ -1632,6 +1642,9 @@ void qd_message_send(qd_message_t *in_msg,
                 while (local_buf && local_buf != next_buf) {
                     DEQ_REMOVE_HEAD(content->buffers);
                     qd_buffer_free(local_buf);
+                    if (!msg->content->buffers_freed)
+                        msg->content->buffers_freed = true;
+
                     local_buf = DEQ_HEAD(content->buffers);
 
                     // by freeing a buffer there now may be room to restart a
@@ -1712,10 +1725,17 @@ static int qd_check_field_LH(qd_message_content_t *content,
 static bool qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth)
 {
     qd_error_clear();
+
+    //
+    // In the case of a streaming or multi buffer message, there is a change that some buffers might be freed before the entire
+    // message has arrived in which case we cannot reliably check the message using the depth.
+    //
+    if (content->buffers_freed)
+        return true;
+
     qd_buffer_t *buffer  = DEQ_HEAD(content->buffers);
 
     if (!buffer) {
-        qd_error(QD_ERROR_MESSAGE, "No data");
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 14d2593..7b6c4ad 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -123,6 +123,7 @@ typedef struct {
     bool                 q2_input_holdoff;               // hold off calling pn_link_recv
     bool                 aborted;                        // receive completed with abort flag set
     bool                 disable_q2_holdoff;             // Disable the Q2 flow control
+    bool                 buffers_freed;                   // Has at least one buffer been freed ?
     uint8_t              priority;
 } qd_message_content_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 8ccb049..1a43eec 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -23,6 +23,17 @@
 #include <strings.h>
 #include "forwarder.h"
 
+typedef struct qdr_forward_deliver_info_t {
+    DEQ_LINKS(struct qdr_forward_deliver_info_t);
+    qdr_link_t     *out_link;
+    qdr_delivery_t *out_dlv;
+} qdr_forward_deliver_info_t;
+
+ALLOC_DECLARE(qdr_forward_deliver_info_t);
+DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
+
+ALLOC_DEFINE(qdr_forward_deliver_info_t);
+
 
 static qdr_link_t * peer_data_link(qdr_core_t *core,
                                    qdr_node_t *node,
@@ -266,6 +277,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     bool          presettled           = !!in_delivery ? in_delivery->settled : true;
     bool          receive_complete     = qd_message_receive_complete(qdr_delivery_message(in_delivery));
     int           priority             = qd_message_get_priority(msg);
+    qdr_forward_deliver_info_list_t deliver_info_list;
+    DEQ_INIT(deliver_info_list);
 
     //
     // If the delivery is not presettled, set the settled flag for forwarding so all
@@ -285,7 +298,14 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         while (link_ref) {
             qdr_link_t     *out_link     = link_ref->link;
             qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-            qdr_forward_deliver_CT(core, out_link, out_delivery);
+
+            // Store the out_link and out_delivery so we can forward the delivery later on
+            qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+            ZERO(deliver_info);
+            deliver_info->out_dlv = out_delivery;
+            deliver_info->out_link = out_link;
+            DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
             fanout++;
             if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
                 addr->deliveries_egress++;
@@ -360,7 +380,14 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
                 core->data_links_by_mask_bit[link_bit].links[priority];
             if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
                 qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
-                qdr_forward_deliver_CT(core, dest_link, out_delivery);
+
+                // Store the out_link and out_delivery so we can forward the delivery later on
+                qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+                ZERO(deliver_info);
+                deliver_info->out_dlv = out_delivery;
+                deliver_info->out_link = dest_link;
+                DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
                 fanout++;
                 addr->deliveries_transit++;
                 if (dest_link->link_type == QD_LINK_ROUTER)
@@ -375,7 +402,6 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         //
         // Forward to in-process subscribers
         //
-
         qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
         while (sub) {
             //
@@ -399,6 +425,14 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         }
     }
 
+    qdr_forward_deliver_info_t *deliver_info = DEQ_HEAD(deliver_info_list);
+    while (deliver_info) {
+        qdr_forward_deliver_CT(core, deliver_info->out_link, deliver_info->out_dlv);
+        DEQ_REMOVE_HEAD(deliver_info_list);
+        free_qdr_forward_deliver_info_t(deliver_info);
+        deliver_info = DEQ_HEAD(deliver_info_list);
+    }
+
     if (in_delivery && !presettled) {
         if (fanout == 0)
             //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 5f5efc6..cc704c1 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -778,12 +778,14 @@ static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_ty
     pn_delivery_t  *pnd          = pn_link_current(pn_link);
 
     if (pnd) {
-        qd_message_t   *msg   = qd_message_receive(pnd);
-
-        if (!qd_message_receive_complete(msg)) {
-            qd_message_Q2_holdoff_disable(msg);
-            deferred_AMQP_rx_handler((void *)link, false);
+        qd_message_t *msg = qd_get_message_context(pnd);
+        if (msg) {
+            if (!qd_message_receive_complete(msg)) {
+                qd_message_Q2_holdoff_disable(msg);
+                deferred_AMQP_rx_handler((void *)link, false);
+            }
         }
+
     }
 
     qd_router_t    *router = (qd_router_t*) context;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org