You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/05/03 20:54:13 UTC

[qpid-dispatch] branch master updated (a02bda8 -> e568032)

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from a02bda8  NO-JIRA: use cmake STRING type to avoid implicit conversion warnings
     new 9bca5dd  DISPATCH-1330: fix Q2 stall due to msg buffer refcount error
     new e568032  DISPATCH-1330: check for Q2 stall when freeing messages

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/qpid/dispatch/container.h |   1 +
 src/message.c                     |  51 +++++++++-----
 src/message_private.h             |   4 +-
 src/router_node.c                 |  31 +++++----
 tests/system_tests_one_router.py  | 139 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 193 insertions(+), 33 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/02: DISPATCH-1330: fix Q2 stall due to msg buffer refcount error

Posted by kg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 9bca5ddf0f373dff7037868da2762033bf47bbc5
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed May 1 10:56:50 2019 -0400

    DISPATCH-1330: fix Q2 stall due to msg buffer refcount error
    
    Also remove some dead code.
    
    This closes #498
---
 src/message.c         | 31 +++++++++++++++++--------------
 src/message_private.h |  2 --
 2 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/src/message.c b/src/message.c
index c9ff14e..4434187 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1104,14 +1104,15 @@ void qd_message_set_discard(qd_message_t *msg, bool discard)
 }
 
 
+// update the buffer reference counts for a new outgoing message
+//
 void qd_message_add_fanout(qd_message_t *in_msg,
                            qd_message_t *out_msg)
 {
+    if (!out_msg)
+        return;
 
-    // out_msg will be 0 if we are forwarding to an internal subscriber (like
-    // $management).  If so we treat in_msg like an out_msg
-    assert(in_msg);
-    qd_message_pvt_t *msg = (qd_message_pvt_t *)((out_msg) ? out_msg : in_msg);
+    qd_message_pvt_t *msg = (qd_message_pvt_t *)out_msg;
     msg->is_fanout = true;
 
     qd_message_content_t *content = msg->content;
@@ -1119,11 +1120,19 @@ void qd_message_add_fanout(qd_message_t *in_msg,
     LOCK(content->lock);
     ++content->fanout;
 
-    // do not free the buffers until all fanout consumers are done with them
+    // do not free the buffers until all fanout senders are done with them
     qd_buffer_t *buf = DEQ_HEAD(content->buffers);
-    while (buf) {
-        qd_buffer_inc_fanout(buf);
-        buf = DEQ_NEXT(buf);
+    if (buf) {
+        // DISPATCH-1330: since we're incrementing the refcount be sure to set
+        // the cursor to the head buf in case msg is discarded before all data
+        // is sent (we'll decref any unsent buffers at that time)
+        //
+        msg->cursor.buffer = buf;
+
+        while (buf) {
+            qd_buffer_inc_fanout(buf);
+            buf = DEQ_NEXT(buf);
+        }
     }
     UNLOCK(content->lock);
 }
@@ -1182,12 +1191,6 @@ void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent)
     msg->tag_sent = tag_sent;
 }
 
-qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *in_msg)
-{
-    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
-    return msg->cursor;
-}
-
 
 /**
  * Receive and discard large messages for which there is no destination.
diff --git a/src/message_private.h b/src/message_private.h
index 850c534..daae03a 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -147,8 +147,6 @@ ALLOC_DECLARE(qd_message_content_t);
 /** Initialize logging */
 void qd_message_initialize();
 
-qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
-
 #define QDR_N_PRIORITIES     10
 #define QDR_MAX_PRIORITY     (QDR_N_PRIORITIES - 1)
 #define QDR_DEFAULT_PRIORITY  4


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/02: DISPATCH-1330: check for Q2 stall when freeing messages

Posted by kg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit e5680326cfa93b7dbcb440aa0081c2ed34df772e
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu May 2 11:51:59 2019 -0400

    DISPATCH-1330: check for Q2 stall when freeing messages
    
    This closes #500
---
 include/qpid/dispatch/container.h |   1 +
 src/message.c                     |  20 ++++--
 src/message_private.h             |   2 +-
 src/router_node.c                 |  31 +++++----
 tests/system_tests_one_router.py  | 139 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 176 insertions(+), 17 deletions(-)

diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index fd35a48..bfe1da4 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -197,6 +197,7 @@ void qd_link_detach(qd_link_t *link);
 bool qd_link_drain_changed(qd_link_t *link, bool *mode);
 void qd_link_free(qd_link_t *link);
 void *qd_link_get_node_context(const qd_link_t *link);
+void qd_link_restart_rx(qd_link_t *link);
 
 ///@}
 #endif
diff --git a/src/message.c b/src/message.c
index 4434187..95330a5 100644
--- a/src/message.c
+++ b/src/message.c
@@ -935,6 +935,7 @@ void qd_message_free(qd_message_t *in_msg)
         //
         LOCK(content->lock);
 
+        const bool was_blocked = !qd_message_Q2_holdoff_should_unblock(in_msg);
         qd_buffer_t *buf = msg->cursor.buffer;
         while (buf) {
             qd_buffer_t *next_buf = DEQ_NEXT(buf);
@@ -946,6 +947,17 @@ void qd_message_free(qd_message_t *in_msg)
         }
         --content->fanout;
 
+        //
+        // it is possible that we've freed enough buffers to clear Q2 holdoff
+        //
+        if (content->q2_input_holdoff
+            && was_blocked
+            && qd_message_Q2_holdoff_should_unblock(in_msg)) {
+
+            content->q2_input_holdoff = false;
+            qd_link_restart_rx(qd_message_get_receiving_link(in_msg));
+        }
+
         UNLOCK(content->lock);
     }
 
@@ -1215,7 +1227,7 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
             // end of message or error. Call the message complete
             msg->content->receive_complete = true;
             msg->content->aborted = pn_delivery_aborted(delivery);
-            msg->content->input_link = 0;
+            qd_nullify_safe_ptr(&msg->content->input_link_sp);
 
             pn_record_t *record = pn_delivery_attachments(delivery);
             pn_record_set(record, PN_DELIVERY_CTX, 0);
@@ -1256,7 +1268,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         msg = (qd_message_pvt_t*) qd_message();
         qd_link_t       *qdl = (qd_link_t *)pn_link_get_context(link);
         qd_connection_t *qdc = qd_link_connection(qdl);
-        msg->content->input_link = pn_link_get_context(link);
+        set_safe_ptr_qd_link_t(pn_link_get_context(link), &msg->content->input_link_sp);
         msg->strip_annotations_in  = qd_connection_strip_annotations_in(qdc);
         pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
         pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
@@ -1315,7 +1327,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 
                 content->receive_complete = true;
                 content->aborted = pn_delivery_aborted(delivery);
-                content->input_link = 0;
+                qd_nullify_safe_ptr(&content->input_link_sp);
 
                 // unlink message and delivery
                 pn_record_set(record, PN_DELIVERY_CTX, 0);
@@ -2115,7 +2127,7 @@ bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg)
 
 qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg)
 {
-    return ((qd_message_pvt_t *)msg)->content->input_link;
+    return safe_deref_qd_link_t(((qd_message_pvt_t *)msg)->content->input_link_sp);
 }
 
 
diff --git a/src/message_private.h b/src/message_private.h
index daae03a..3368ec7 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -110,7 +110,7 @@ typedef struct {
     qd_parsed_field_t   *ma_pf_trace;
     int                  ma_int_phase;
     uint32_t             fanout;                         // The number of receivers for this message, including in-process subscribers.
-    qd_link_t           *input_link;                     // message received on this link
+    qd_link_t_sp         input_link_sp;                  // message received on this link
 
     bool                 ma_parsed;                      // have parsed annotations in incoming message
     bool                 discard;                        // Should this message be discarded?
diff --git a/src/router_node.c b/src/router_node.c
index 4b0df66..c898379 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1564,15 +1564,7 @@ static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_
         qdr_link_stalled_outbound(link);
 
     if (restart_rx) {
-        qd_link_t *qdl_in = qd_message_get_receiving_link(msg_out);
-        if (qdl_in) {
-            qd_connection_t *qdc_in = qd_link_connection(qdl_in);
-            if (qdc_in) {
-                qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
-                set_safe_ptr_qd_link_t(qdl_in, safe_ptr);
-                qd_connection_invoke_deferred(qdc_in, deferred_AMQP_rx_handler, safe_ptr);
-            }
-        }
+        qd_link_restart_rx(qd_message_get_receiving_link(msg_out));
     }
 
     bool send_complete = qdr_delivery_send_complete(dlv);
@@ -1694,9 +1686,7 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
                 qdr_delivery_set_disposition(dlv, disp);
                 qd_message_set_discard(msg, true);
                 qd_message_Q2_holdoff_disable(msg);
-                qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
-                set_safe_ptr_qd_link_t(link, safe_ptr);
-                qd_connection_invoke_deferred(qd_conn, deferred_AMQP_rx_handler, safe_ptr);
+                qd_link_restart_rx(link);
             }
         }
     }
@@ -1756,3 +1746,20 @@ qdr_core_t *qd_router_core(qd_dispatch_t *qd)
     return qd->router->router_core;
 }
 
+
+// called when Q2 holdoff is deactivated so we can receive more message buffers
+//
+void qd_link_restart_rx(qd_link_t *in_link)
+{
+    if (!in_link)
+        return;
+
+    assert(qd_link_direction(in_link) == QD_INCOMING);
+
+    qd_connection_t *in_conn = qd_link_connection(in_link);
+    if (in_conn) {
+        qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
+        set_safe_ptr_qd_link_t(in_link, safe_ptr);
+        qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, safe_ptr);
+    }
+}
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 1005912..79f6a15 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -31,6 +31,7 @@ from proton.utils import BlockingConnection, SyncRequestResponse
 from qpid_dispatch.management.client import Node
 import os, json
 from subprocess import PIPE, STDOUT
+from time import sleep
 
 CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451}
 CONNECTION_PROPERTIES_SYMBOL = dict()
@@ -491,6 +492,16 @@ class OneRouterTest(TestCase):
         # because we do not have the policy permission to do so.
         self.assertTrue(passed)
 
+    def test_45_q2_holdoff_drop_stalled_rx(self):
+        """
+        Verify that dropping a slow consumer while in Q2 flow control does
+        not hang the router
+        """
+        test = Q2HoldoffDropTest(self.router)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
@@ -3101,5 +3112,133 @@ class UnsettledLargeMessageTest(MessagingHandler):
             self.recv_conn.close()
 
 
+class Q2HoldoffDropTest(MessagingHandler):
+    """
+    Create 3 multicast receivers, two which grant 2 credits and one that grants
+    only one.  Send enough data to force Q2 holdoff (since one rx is blocked)
+    Close the stalled rx connection, verify the remaining receivers get the
+    message (Q2 holdoff disabled)
+    """
+    def __init__(self, router):
+        super(Q2HoldoffDropTest, self).__init__(prefetch=0,
+                                                auto_accept=False,
+                                                auto_settle=False)
+        self.router = router
+        self.rx_fast1_conn = None
+        self.rx_fast1 = None
+        self.rx_fast2_conn = None
+        self.rx_fast2 = None
+        self.rx_slow_conn = None
+        self.rx_slow = None
+        self.tx_conn = None
+        self.tx = None
+        self.timer = None
+        self.reactor = None
+        self.error = None
+        self.n_attached = 0
+        self.n_rx = 0
+        self.n_tx = 0
+        self.close_timer = 0
+
+        # currently the router buffer size is 512 bytes and the Q2 holdoff
+        # buffer chain high watermark is 256 buffers.  We need to send a
+        # message that will be big enough to trigger Q2 holdoff
+        self.big_msg = Message(body=["DISPATCH-1330" * (512 * 256 * 4)])
+
+    def done(self):
+        if self.timer:
+            self.timer.cancel()
+        if self.close_timer:
+            self.close_timer.cancel()
+        if self.tx_conn:
+            self.tx_conn.close()
+        if self.rx_fast1_conn:
+            self.rx_fast1_conn.close()
+        if self.rx_fast2_conn:
+            self.rx_fast2_conn.close()
+        if self.rx_slow_conn:
+            self.rx_slow_conn.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.done()
+
+    def on_start(self, event):
+        self.reactor = event.reactor
+        self.timer = self.reactor.schedule(TIMEOUT, Timeout(self))
+
+        self.rx_slow_conn = event.container.connect(self.router.addresses[0])
+        self.rx_fast1_conn = event.container.connect(self.router.addresses[0])
+        self.rx_fast2_conn = event.container.connect(self.router.addresses[0])
+
+        self.rx_slow = event.container.create_receiver(self.rx_slow_conn,
+                                                       source="multicast.dispatch-1330",
+                                                       name="rx_slow")
+        self.rx_fast1 = event.container.create_receiver(self.rx_fast1_conn,
+                                                        source="multicast.dispatch-1330",
+                                                        name="rx_fast1")
+        self.rx_fast2 = event.container.create_receiver(self.rx_fast2_conn,
+                                                        source="multicast.dispatch-1330",
+                                                        name="rx_fast2")
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            self.n_attached += 1
+            if self.n_attached == 3:
+                self.rx_fast1.flow(2)
+                self.rx_fast2.flow(2)
+                self.rx_slow.flow(1)   # stall on 2nd msg
+
+                self.tx_conn = event.container.connect(self.router.addresses[0])
+                self.tx = event.container.create_sender(self.tx_conn,
+                                                        target="multicast.dispatch-1330",
+                                                        name="tx")
+
+    def on_sendable(self, event):
+        if self.n_tx == 0:
+            # wait until all subscribers present
+            self.router.wait_address("multicast.dispatch-1330", subscribers=3)
+            for i in range(2):
+                dlv = self.tx.send(self.big_msg)
+                dlv.settle()
+                self.n_tx += 1
+
+    def close_rx_slow(self, event):
+        if self.rx_slow_conn:
+            self.rx_slow_conn.close()
+            self.rx_slow_conn = None
+            self.rx_slow = None
+
+    def on_message(self, event):
+        self.n_rx += 1
+        if self.n_rx == 3: # first will arrive, second is blocked
+
+            class CloseTimer(Timeout):
+                def on_timer_task(self, event):
+                    self.parent.close_rx_slow(event)
+
+            # 2 second wait for Q2 to fill up
+            self.close_timer = self.reactor.schedule(2.0, CloseTimer(self))
+
+        if self.n_rx == 5:
+            # succesfully received on last two receivers
+            self.done()
+
+    def run(self):
+        Container(self).run()
+
+        # wait until the router has cleaned up the route table
+        clean = False
+        while not clean:
+            clean = True
+            atype = 'org.apache.qpid.dispatch.router.address'
+            addrs = self.router.management.query(type=atype).get_dicts()
+            if list(filter(lambda a: a['name'].find("dispatch-1330") != -1, addrs)):
+                clean = False
+                break
+            if not clean:
+                sleep(0.1)
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org