You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/09/24 19:10:01 UTC
qpid-dispatch git commit: DISPATCH-1129 - Moved call to forwarding
multicast messages to end of forwarder function. Also modified
qd_message_check_LH to not validate the message if bufferes have already been
freed
Repository: qpid-dispatch
Updated Branches:
refs/heads/AARON-PROBLEM [created] 00edc49fe
DISPATCH-1129 - Moved call to forwarding multicast messages to end of forwarder function. Also modified qd_message_check_LH to not validate the message if bufferes have already been freed
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/00edc49f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/00edc49f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/00edc49f
Branch: refs/heads/AARON-PROBLEM
Commit: 00edc49fec82aaabcf4d3bfc65e24ed0b73d1d62
Parents: 12692a7
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Sep 24 13:06:47 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Mon Sep 24 15:01:28 2018 -0400
----------------------------------------------------------------------
include/qpid/dispatch/message.h | 22 +++++++++++++++
src/message.c | 44 ++++++++++++++++++++++++++++--
src/message_private.h | 1 +
src/router_core/forwarder.c | 31 +++++++++++++++++++--
src/router_core/router_core_private.h | 11 ++++++++
src/router_node.c | 12 ++++----
6 files changed, 110 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index ec0b901..915094a 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -221,6 +221,13 @@ void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *i
qd_message_t *qd_message_receive(pn_delivery_t *delivery);
/**
+ * Returns the PN_DELIVERY_CTX record from the attachments
+ *
+ * @param delivery An incoming delivery from a link
+ */
+qd_message_t * qd_pn_delivery_get_delivery_context(pn_delivery_t *delivery);
+
+/**
* Send the message outbound on an outgoing link.
*
* @param msg A pointer to a message to be sent.
@@ -335,6 +342,21 @@ void qd_message_set_discard(qd_message_t *msg, bool discard);
bool qd_message_receive_complete(qd_message_t *msg);
/**
+ * Returns true if at least one message buffer has been freed, false otherwise.
+ * @param msg A pointer to the message.
+ */
+bool qd_message_is_buffers_freed(qd_message_t *in_msg);
+
+/**
+ *Set the buffers_freed field on the message to to the passed in boolean value.
+ *
+ * @param msg A pointer to the message.
+ * @param discard - the boolean value of buffers_freed.
+ */
+
+void qd_message_set_buffers_freed(qd_message_t *msg, bool buffers_freed);
+
+/**
* Returns true if the message has been completely received AND the message has been completely sent.
*/
bool qd_message_send_complete(qd_message_t *msg);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index b3c9cb8..eef8858 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1137,6 +1137,26 @@ bool qd_message_receive_complete(qd_message_t *in_msg)
return msg->content->receive_complete;
}
+
+bool qd_message_is_buffers_freed(qd_message_t *in_msg)
+{
+ if (!in_msg)
+ return false;
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ return msg->content->buffers_freed;
+}
+
+
+void qd_message_set_buffers_freed(qd_message_t *msg, bool buffers_freed)
+{
+ if (!msg)
+ return;
+
+ qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
+ pvt_msg->content->buffers_freed = buffers_freed;
+}
+
+
bool qd_message_send_complete(qd_message_t *in_msg)
{
if (!in_msg)
@@ -1207,6 +1227,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
return msg_in;
}
+qd_message_t * qd_pn_delivery_get_delivery_context(pn_delivery_t *delivery)
+{
+ pn_record_t *record = pn_delivery_attachments(delivery);
+ if (record)
+ return pn_record_get(record, PN_DELIVERY_CTX);
+
+ return 0;
+}
+
qd_message_t *qd_message_receive(pn_delivery_t *delivery)
{
@@ -1214,7 +1243,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
ssize_t rc;
pn_record_t *record = pn_delivery_attachments(delivery);
- qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX);
+ qd_message_t *m = pn_record_get(record, PN_DELIVERY_CTX);
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) m;
//
// If there is no message associated with the delivery then this is the
@@ -1222,13 +1252,14 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
// Allocate a message descriptor and link it and the delivery together.
//
if (!msg) {
- msg = (qd_message_pvt_t*) qd_message();
+ m = qd_message();
+ msg = (qd_message_pvt_t*) m;
qd_link_t *qdl = (qd_link_t *)pn_link_get_context(link);
qd_connection_t *qdc = qd_link_connection(qdl);
msg->content->input_link = pn_link_get_context(link);
msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc);
pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
- pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
+ pn_record_set(record, PN_DELIVERY_CTX, (void*) m);
}
//
@@ -1632,6 +1663,9 @@ void qd_message_send(qd_message_t *in_msg,
while (local_buf && local_buf != next_buf) {
DEQ_REMOVE_HEAD(content->buffers);
qd_buffer_free(local_buf);
+ if (!qd_message_is_buffers_freed(in_msg))
+ qd_message_set_buffers_freed(in_msg, true);
+
local_buf = DEQ_HEAD(content->buffers);
// by freeing a buffer there now may be room to restart a
@@ -1712,6 +1746,10 @@ static int qd_check_field_LH(qd_message_content_t *content,
static bool qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth)
{
qd_error_clear();
+
+ if (content->buffers_freed)
+ return true;
+
qd_buffer_t *buffer = DEQ_HEAD(content->buffers);
if (!buffer) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 14d2593..c45573a 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -123,6 +123,7 @@ typedef struct {
bool q2_input_holdoff; // hold off calling pn_link_recv
bool aborted; // receive completed with abort flag set
bool disable_q2_holdoff; // Disable the Q2 flow control
+ bool buffers_freed; // Have any message content buffers be freed ?
uint8_t priority;
} qd_message_content_t;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 8ccb049..4286e59 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -23,6 +23,8 @@
#include <strings.h>
#include "forwarder.h"
+ALLOC_DEFINE(qdr_forward_deliver_info_t);
+
static qdr_link_t * peer_data_link(qdr_core_t *core,
qdr_node_t *node,
@@ -266,6 +268,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
bool presettled = !!in_delivery ? in_delivery->settled : true;
bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
int priority = qd_message_get_priority(msg);
+ qdr_forward_deliver_info_list_t deliver_info_list;
+ DEQ_INIT(deliver_info_list);
//
// If the delivery is not presettled, set the settled flag for forwarding so all
@@ -285,7 +289,14 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
while (link_ref) {
qdr_link_t *out_link = link_ref->link;
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery);
+
+ // Store the out_link and out_delivery so we can forward the delivery later on
+ qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+ ZERO(deliver_info);
+ deliver_info->out_dlv = out_delivery;
+ deliver_info->out_link = out_link;
+ DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
fanout++;
if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
addr->deliveries_egress++;
@@ -360,7 +371,14 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
core->data_links_by_mask_bit[link_bit].links[priority];
if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
- qdr_forward_deliver_CT(core, dest_link, out_delivery);
+
+ // Store the out_link and out_delivery so we can forward the delivery later on
+ qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+ ZERO(deliver_info);
+ deliver_info->out_dlv = out_delivery;
+ deliver_info->out_link = dest_link;
+ DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
fanout++;
addr->deliveries_transit++;
if (dest_link->link_type == QD_LINK_ROUTER)
@@ -375,7 +393,6 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
//
// Forward to in-process subscribers
//
-
qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
while (sub) {
//
@@ -399,6 +416,14 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
}
}
+ qdr_forward_deliver_info_t *deliver_info = DEQ_HEAD(deliver_info_list);
+ while (deliver_info) {
+ qdr_forward_deliver_CT(core, deliver_info->out_link, deliver_info->out_dlv);
+ DEQ_REMOVE_HEAD(deliver_info_list);
+ free_qdr_forward_deliver_info_t(deliver_info);
+ deliver_info = DEQ_HEAD(deliver_info_list);
+ }
+
if (in_delivery && !presettled) {
if (fanout == 0)
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 13c7dc4..cb67b4f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -440,6 +440,17 @@ struct qdr_link_ref_t {
ALLOC_DECLARE(qdr_link_ref_t);
DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
+
+typedef struct qdr_forward_deliver_info_t {
+ DEQ_LINKS(struct qdr_forward_deliver_info_t);
+ qdr_link_t *out_link;
+ qdr_delivery_t *out_dlv;
+} qdr_forward_deliver_info_t;
+
+ALLOC_DECLARE(qdr_forward_deliver_info_t);
+DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
+
+
void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
void move_link_ref(qdr_link_t *link, int from_cls, int to_cls);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00edc49f/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 5f5efc6..f3f0c50 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -778,12 +778,14 @@ static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_ty
pn_delivery_t *pnd = pn_link_current(pn_link);
if (pnd) {
- qd_message_t *msg = qd_message_receive(pnd);
-
- if (!qd_message_receive_complete(msg)) {
- qd_message_Q2_holdoff_disable(msg);
- deferred_AMQP_rx_handler((void *)link, false);
+ qd_message_t *msg = qd_pn_delivery_get_delivery_context(pnd);
+ if (msg) {
+ if (!qd_message_receive_complete(msg)) {
+ qd_message_Q2_holdoff_disable(msg);
+ deferred_AMQP_rx_handler((void *)link, false);
+ }
}
+
}
qd_router_t *router = (qd_router_t*) context;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org