You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/05/03 20:54:14 UTC

[qpid-dispatch] 01/02: DISPATCH-1330: fix Q2 stall due to msg buffer refcount error

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 9bca5ddf0f373dff7037868da2762033bf47bbc5
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed May 1 10:56:50 2019 -0400

    DISPATCH-1330: fix Q2 stall due to msg buffer refcount error
    
    Also remove some dead code.
    
    This closes #498
---
 src/message.c         | 31 +++++++++++++++++--------------
 src/message_private.h |  2 --
 2 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/src/message.c b/src/message.c
index c9ff14e..4434187 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1104,14 +1104,15 @@ void qd_message_set_discard(qd_message_t *msg, bool discard)
 }
 
 
+// update the buffer reference counts for a new outgoing message
+//
 void qd_message_add_fanout(qd_message_t *in_msg,
                            qd_message_t *out_msg)
 {
+    if (!out_msg)
+        return;
 
-    // out_msg will be 0 if we are forwarding to an internal subscriber (like
-    // $management).  If so we treat in_msg like an out_msg
-    assert(in_msg);
-    qd_message_pvt_t *msg = (qd_message_pvt_t *)((out_msg) ? out_msg : in_msg);
+    qd_message_pvt_t *msg = (qd_message_pvt_t *)out_msg;
     msg->is_fanout = true;
 
     qd_message_content_t *content = msg->content;
@@ -1119,11 +1120,19 @@ void qd_message_add_fanout(qd_message_t *in_msg,
     LOCK(content->lock);
     ++content->fanout;
 
-    // do not free the buffers until all fanout consumers are done with them
+    // do not free the buffers until all fanout senders are done with them
     qd_buffer_t *buf = DEQ_HEAD(content->buffers);
-    while (buf) {
-        qd_buffer_inc_fanout(buf);
-        buf = DEQ_NEXT(buf);
+    if (buf) {
+        // DISPATCH-1330: since we're incrementing the refcount be sure to set
+        // the cursor to the head buf in case msg is discarded before all data
+        // is sent (we'll decref any unsent buffers at that time)
+        //
+        msg->cursor.buffer = buf;
+
+        while (buf) {
+            qd_buffer_inc_fanout(buf);
+            buf = DEQ_NEXT(buf);
+        }
     }
     UNLOCK(content->lock);
 }
@@ -1182,12 +1191,6 @@ void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent)
     msg->tag_sent = tag_sent;
 }
 
-qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *in_msg)
-{
-    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
-    return msg->cursor;
-}
-
 
 /**
  * Receive and discard large messages for which there is no destination.
diff --git a/src/message_private.h b/src/message_private.h
index 850c534..daae03a 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -147,8 +147,6 @@ ALLOC_DECLARE(qd_message_content_t);
 /** Initialize logging */
 void qd_message_initialize();
 
-qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
-
 #define QDR_N_PRIORITIES     10
 #define QDR_MAX_PRIORITY     (QDR_N_PRIORITIES - 1)
 #define QDR_DEFAULT_PRIORITY  4


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org