You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by GitBox <gi...@apache.org> on 2019/01/10 17:55:59 UTC

[GitHub] asfgit closed pull request #436: DISPATCH-1237 - Modified link cleanup code to not release a multicast…

asfgit closed pull request #436: DISPATCH-1237 - Modified link cleanup code to not release a multicast…
URL: https://github.com/apache/qpid-dispatch/pull/436
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 48408ab2..3aaea807 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -373,6 +373,17 @@ size_t qd_message_fanout(qd_message_t *msg);
  */
 void qd_message_add_fanout(qd_message_t *msg);
 
+/**
+ * Increments the num_closed_receivers by 1. This is necessary to track the number of receivers that
+ * dropped out during or just before transmission of a large message.
+ */
+void qd_message_add_num_closed_receivers(qd_message_t *in_msg);
+
+/**
+ * Returns the value of num_closed_receivers
+ */
+int qd_message_num_closed_receivers(qd_message_t *msg);
+
 /**
  * Disable the Q2-holdoff for this message.
  *
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 11275d32..f7d6c514 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -705,6 +705,7 @@ void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition
 uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
 void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted);
 bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery);
+void qdr_delivery_add_num_closed_receivers(qdr_delivery_t *delivery);
 
 /**
  ******************************************************************************
diff --git a/src/message.c b/src/message.c
index 16060d34..d527974a 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1077,6 +1077,22 @@ void qd_message_add_fanout(qd_message_t *in_msg)
     sys_atomic_inc(&msg->content->fanout);
 }
 
+int qd_message_num_closed_receivers(qd_message_t *in_msg)
+{
+    if (!in_msg)
+        return 0;
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    return msg->content->num_closed_receivers;
+}
+
+void qd_message_add_num_closed_receivers(qd_message_t *in_msg)
+{
+    assert(in_msg);
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    msg->content->num_closed_receivers++;
+}
+
+
 /**
 * There are two sources of priority information --
 * message and address. Address takes precedence, falling
@@ -1466,7 +1482,6 @@ void qd_message_send(qd_message_t *in_msg,
     qd_buffer_t          *buf     = 0;
     pn_link_t            *pnl     = qd_link_pn(link);
 
-    int                  fanout   = qd_message_fanout(in_msg);
     *restart_rx                   = false;
     *q3_stalled                   = false;
 
@@ -1606,7 +1621,8 @@ void qd_message_send(qd_message_t *in_msg,
         if (next_buf) {
             // There is a next buffer, the previous buffer has been fully sent by now.
             qd_buffer_add_fanout(buf);
-            if (fanout == qd_buffer_fanout(buf)) {
+
+            if (qd_message_fanout(in_msg) - qd_message_num_closed_receivers(in_msg) == qd_buffer_fanout(buf)) {
                 qd_buffer_t *local_buf = DEQ_HEAD(content->buffers);
                 while (local_buf && local_buf != next_buf) {
                     DEQ_REMOVE_HEAD(content->buffers);
diff --git a/src/message_private.h b/src/message_private.h
index 36585977..2402e583 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -108,6 +108,7 @@ typedef struct {
     qd_parsed_field_t   *ma_pf_trace;
     int                  ma_int_phase;
     sys_atomic_t         fanout;                         // The number of receivers for this message. This number does not include in-process subscribers.
+    int                  num_closed_receivers;
     qd_link_t           *input_link;                     // message received on this link
 
     bool                 ma_parsed;                      // have parsed annotations in incoming message
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 54a27be8..19b257c4 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -687,11 +687,27 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
         DEQ_REMOVE_HEAD(undelivered);
         peer = qdr_delivery_first_peer_CT(dlv);
         while (peer) {
-            qdr_delivery_release_CT(core, peer);
+            if (peer->multicast) {
+                //
+                // If the address of the delivery is a multicast address and if there are no receivers for this address, the peer delivery must be released.
+                //
+                // If the address of the delivery is a multicast address and there is at least one other receiver for the address, dont do anything
+                //
+                if (peer->link && peer->link->owning_addr && DEQ_SIZE(peer->link->owning_addr->rlinks) == 0 &&  qd_bitmask_cardinality(peer->link->owning_addr->rnodes) == 0)  {
+                    qdr_delivery_release_CT(core, peer);
+                }
+            }
+            else {
+                qdr_delivery_release_CT(core, peer);
+            }
             qdr_delivery_unlink_peers_CT(core, dlv, peer);
             peer = qdr_delivery_next_peer_CT(dlv);
         }
 
+        if (dlv->link->link_direction == QD_OUTGOING) {
+            qdr_delivery_add_num_closed_receivers(dlv);
+        }
+
         //
         // Now the undelivered-list reference
         //
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index ecb3e7da..534cc11c 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -384,11 +384,9 @@ void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label)
 void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted)
 {
     assert(delivery);
-
     qd_message_set_aborted(delivery->msg, aborted);
 }
 
-
 bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery)
 {
     if (!delivery)
@@ -396,6 +394,12 @@ bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery)
     return qd_message_aborted(delivery->msg);
 }
 
+void qdr_delivery_add_num_closed_receivers(qdr_delivery_t *delivery)
+{
+    assert(delivery);
+    qd_message_add_num_closed_receivers(delivery->msg);
+}
+
 
 void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label)
 {
@@ -908,6 +912,8 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
 
     int fanout = 0;
 
+    dlv->multicast = qdr_is_addr_treatment_multicast(addr);
+
     if (addr) {
         fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
         if (link->link_type != QD_LINK_CONTROL && link->link_type != QD_LINK_ROUTER) {
@@ -954,7 +960,6 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (1)");
         qdr_link_issue_credit_CT(core, link, 1, false);
     } else if (fanout > 0) {
-        dlv->multicast = qdr_is_addr_treatment_multicast(addr);
         if (dlv->settled || dlv->multicast) {
             //
             // The delivery is settled.  Keep it off the unsettled list and issue
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 2fe31903..6a90e330 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -639,6 +639,16 @@ def test_39_mobile_addr_event_three_receivers_diff_interior(self):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_40_drop_rx_client_multicast_large_message(self):
+        # test what happens if some multicast receivers close in the middle of
+        # a multiframe transfer
+        test = MobileAddrMcastDroppedRxTest(self.routers[2].addresses[0],
+                                            self.routers[2].addresses[0],
+                                            self.routers[2].addresses[0],
+                                            self.routers[2].addresses[0],
+                                            "multicast.40")
+        test.run()
+        self.assertEqual(None, test.error)
 
 
 class LinkRouteProxyTest(TestCase):
@@ -1488,6 +1498,49 @@ def on_message(self, event):
     def run(self):
         Container(self).run()
 
+class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest):
+    # failure scenario - cause some receiving clients to close while a large
+    # message is in transit
+    def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+                 sender_host, address, check_addr_host=None):
+        super(MobileAddrMcastDroppedRxTest, self).__init__(receiver1_host,
+                                                           receiver2_host,
+                                                           receiver3_host,
+                                                           sender_host,
+                                                           address,
+                                                           check_addr_host=check_addr_host,
+                                                           large_msg=True)
+        self.n_accepted = 0
+        self.n_released = 0
+        self.recv1_closed = False
+        self.recv2_closed = False
+
+    def _check_done(self):
+        if self.n_accepted + self.n_released == self.count:
+            self.receiver3_conn.close()
+            self.sender_conn.close()
+            self.timer.cancel()
+
+    def on_message(self, event):
+        super(MobileAddrMcastDroppedRxTest, self).on_message(event)
+
+        # start closing receivers
+        if self.n_rcvd1 == 50:
+            if not self.recv1_closed:
+                self.receiver1_conn.close()
+                self.recv1_closed = True
+        if self.n_rcvd2 == 75:
+            if not self.recv2_closed:
+                self.recv2_closed = True
+                self.receiver2_conn.close()
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+        self._check_done()
+
+    def on_released(self, event):
+        self.n_released += 1
+        self._check_done()
 
 class MobileAddressEventTest(MessagingHandler):
     def __init__(self, receiver1_host, receiver2_host, receiver3_host,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org