You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/05/03 20:54:14 UTC
[qpid-dispatch] 01/02: DISPATCH-1330: fix Q2 stall due to msg
buffer refcount error
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 9bca5ddf0f373dff7037868da2762033bf47bbc5
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed May 1 10:56:50 2019 -0400
DISPATCH-1330: fix Q2 stall due to msg buffer refcount error
Also remove some dead code.
This closes #498
---
src/message.c | 31 +++++++++++++++++--------------
src/message_private.h | 2 --
2 files changed, 17 insertions(+), 16 deletions(-)
diff --git a/src/message.c b/src/message.c
index c9ff14e..4434187 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1104,14 +1104,15 @@ void qd_message_set_discard(qd_message_t *msg, bool discard)
}
+// update the buffer reference counts for a new outgoing message
+//
void qd_message_add_fanout(qd_message_t *in_msg,
qd_message_t *out_msg)
{
+ if (!out_msg)
+ return;
- // out_msg will be 0 if we are forwarding to an internal subscriber (like
- // $management). If so we treat in_msg like an out_msg
- assert(in_msg);
- qd_message_pvt_t *msg = (qd_message_pvt_t *)((out_msg) ? out_msg : in_msg);
+ qd_message_pvt_t *msg = (qd_message_pvt_t *)out_msg;
msg->is_fanout = true;
qd_message_content_t *content = msg->content;
@@ -1119,11 +1120,19 @@ void qd_message_add_fanout(qd_message_t *in_msg,
LOCK(content->lock);
++content->fanout;
- // do not free the buffers until all fanout consumers are done with them
+ // do not free the buffers until all fanout senders are done with them
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
- while (buf) {
- qd_buffer_inc_fanout(buf);
- buf = DEQ_NEXT(buf);
+ if (buf) {
+ // DISPATCH-1330: since we're incrementing the refcount be sure to set
+ // the cursor to the head buf in case msg is discarded before all data
+ // is sent (we'll decref any unsent buffers at that time)
+ //
+ msg->cursor.buffer = buf;
+
+ while (buf) {
+ qd_buffer_inc_fanout(buf);
+ buf = DEQ_NEXT(buf);
+ }
}
UNLOCK(content->lock);
}
@@ -1182,12 +1191,6 @@ void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent)
msg->tag_sent = tag_sent;
}
-qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *in_msg)
-{
- qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
- return msg->cursor;
-}
-
/**
* Receive and discard large messages for which there is no destination.
diff --git a/src/message_private.h b/src/message_private.h
index 850c534..daae03a 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -147,8 +147,6 @@ ALLOC_DECLARE(qd_message_content_t);
/** Initialize logging */
void qd_message_initialize();
-qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
-
#define QDR_N_PRIORITIES 10
#define QDR_MAX_PRIORITY (QDR_N_PRIORITIES - 1)
#define QDR_DEFAULT_PRIORITY 4
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org