You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/09/25 21:11:18 UTC
qpid-dispatch git commit: DISPATCH-1129 - Moved call to forwarding
multicast messages to end of forwarder function. Also modified
qd_message_check_LH to not validate the message if bufferes have already been
freed. This closes #383.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 15c46db55 -> fa03ea400
DISPATCH-1129 - Moved call to forwarding multicast messages to end of forwarder function. Also modified qd_message_check_LH to not validate the message if bufferes have already been freed. This closes #383.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fa03ea40
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fa03ea40
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fa03ea40
Branch: refs/heads/master
Commit: fa03ea4008524ca7eb5ba4c6409ec0328eca2e87
Parents: 15c46db
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Sep 24 13:06:47 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue Sep 25 17:05:04 2018 -0400
----------------------------------------------------------------------
include/qpid/dispatch/message.h | 8 ++++++++
src/message.c | 24 ++++++++++++++++++++--
src/message_private.h | 1 +
src/router_core/forwarder.c | 40 +++++++++++++++++++++++++++++++++---
src/router_node.c | 12 ++++++-----
5 files changed, 75 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index ec0b901..27bc294 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -221,6 +221,14 @@ void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *i
qd_message_t *qd_message_receive(pn_delivery_t *delivery);
/**
+ * Returns the PN_DELIVERY_CTX record from the attachments
+ *
+ * @param delivery An incoming delivery from a link
+ * @return - pointer to qd_message_t object
+ */
+qd_message_t * qd_get_message_context(pn_delivery_t *delivery);
+
+/**
* Send the message outbound on an outgoing link.
*
* @param msg A pointer to a message to be sent.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index b3c9cb8..77a49e5 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1137,6 +1137,7 @@ bool qd_message_receive_complete(qd_message_t *in_msg)
return msg->content->receive_complete;
}
+
bool qd_message_send_complete(qd_message_t *in_msg)
{
if (!in_msg)
@@ -1207,6 +1208,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
return msg_in;
}
+qd_message_t * qd_get_message_context(pn_delivery_t *delivery)
+{
+ pn_record_t *record = pn_delivery_attachments(delivery);
+ if (record)
+ return (qd_message_t *) pn_record_get(record, PN_DELIVERY_CTX);
+
+ return 0;
+}
+
qd_message_t *qd_message_receive(pn_delivery_t *delivery)
{
@@ -1229,7 +1239,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc);
pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
- }
+}
//
// The discard flag indicates we should keep reading the input stream
@@ -1632,6 +1642,9 @@ void qd_message_send(qd_message_t *in_msg,
while (local_buf && local_buf != next_buf) {
DEQ_REMOVE_HEAD(content->buffers);
qd_buffer_free(local_buf);
+ if (!msg->content->buffers_freed)
+ msg->content->buffers_freed = true;
+
local_buf = DEQ_HEAD(content->buffers);
// by freeing a buffer there now may be room to restart a
@@ -1712,10 +1725,17 @@ static int qd_check_field_LH(qd_message_content_t *content,
static bool qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth)
{
qd_error_clear();
+
+ //
+ // In the case of a streaming or multi buffer message, there is a change that some buffers might be freed before the entire
+ // message has arrived in which case we cannot reliably check the message using the depth.
+ //
+ if (content->buffers_freed)
+ return true;
+
qd_buffer_t *buffer = DEQ_HEAD(content->buffers);
if (!buffer) {
- qd_error(QD_ERROR_MESSAGE, "No data");
return false;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 14d2593..7b6c4ad 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -123,6 +123,7 @@ typedef struct {
bool q2_input_holdoff; // hold off calling pn_link_recv
bool aborted; // receive completed with abort flag set
bool disable_q2_holdoff; // Disable the Q2 flow control
+ bool buffers_freed; // Has at least one buffer been freed ?
uint8_t priority;
} qd_message_content_t;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 8ccb049..1a43eec 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -23,6 +23,17 @@
#include <strings.h>
#include "forwarder.h"
+typedef struct qdr_forward_deliver_info_t {
+ DEQ_LINKS(struct qdr_forward_deliver_info_t);
+ qdr_link_t *out_link;
+ qdr_delivery_t *out_dlv;
+} qdr_forward_deliver_info_t;
+
+ALLOC_DECLARE(qdr_forward_deliver_info_t);
+DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
+
+ALLOC_DEFINE(qdr_forward_deliver_info_t);
+
static qdr_link_t * peer_data_link(qdr_core_t *core,
qdr_node_t *node,
@@ -266,6 +277,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
bool presettled = !!in_delivery ? in_delivery->settled : true;
bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
int priority = qd_message_get_priority(msg);
+ qdr_forward_deliver_info_list_t deliver_info_list;
+ DEQ_INIT(deliver_info_list);
//
// If the delivery is not presettled, set the settled flag for forwarding so all
@@ -285,7 +298,14 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
while (link_ref) {
qdr_link_t *out_link = link_ref->link;
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery);
+
+ // Store the out_link and out_delivery so we can forward the delivery later on
+ qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+ ZERO(deliver_info);
+ deliver_info->out_dlv = out_delivery;
+ deliver_info->out_link = out_link;
+ DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
fanout++;
if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
addr->deliveries_egress++;
@@ -360,7 +380,14 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
core->data_links_by_mask_bit[link_bit].links[priority];
if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
- qdr_forward_deliver_CT(core, dest_link, out_delivery);
+
+ // Store the out_link and out_delivery so we can forward the delivery later on
+ qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+ ZERO(deliver_info);
+ deliver_info->out_dlv = out_delivery;
+ deliver_info->out_link = dest_link;
+ DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
fanout++;
addr->deliveries_transit++;
if (dest_link->link_type == QD_LINK_ROUTER)
@@ -375,7 +402,6 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
//
// Forward to in-process subscribers
//
-
qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
while (sub) {
//
@@ -399,6 +425,14 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
}
}
+ qdr_forward_deliver_info_t *deliver_info = DEQ_HEAD(deliver_info_list);
+ while (deliver_info) {
+ qdr_forward_deliver_CT(core, deliver_info->out_link, deliver_info->out_dlv);
+ DEQ_REMOVE_HEAD(deliver_info_list);
+ free_qdr_forward_deliver_info_t(deliver_info);
+ deliver_info = DEQ_HEAD(deliver_info_list);
+ }
+
if (in_delivery && !presettled) {
if (fanout == 0)
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa03ea40/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 5f5efc6..cc704c1 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -778,12 +778,14 @@ static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_ty
pn_delivery_t *pnd = pn_link_current(pn_link);
if (pnd) {
- qd_message_t *msg = qd_message_receive(pnd);
-
- if (!qd_message_receive_complete(msg)) {
- qd_message_Q2_holdoff_disable(msg);
- deferred_AMQP_rx_handler((void *)link, false);
+ qd_message_t *msg = qd_get_message_context(pnd);
+ if (msg) {
+ if (!qd_message_receive_complete(msg)) {
+ qd_message_Q2_holdoff_disable(msg);
+ deferred_AMQP_rx_handler((void *)link, false);
+ }
}
+
}
qd_router_t *router = (qd_router_t*) context;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org