You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/04/03 20:01:14 UTC

[qpid-dispatch] 02/02: DISPATCH-1302: fix settlement propagation race

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 425edb6ecba873f547feb279a18220a2be24e3de
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Apr 2 11:10:51 2019 -0400

    DISPATCH-1302: fix settlement propagation race
    
    This closes #479
---
 src/message.c                   |   1 -
 src/router_core/transfer.c      |  27 ++-
 src/router_node.c               |  13 +-
 tests/CMakeLists.txt            |   1 +
 tests/system_tests_multicast.py | 500 ++++++++++++++++++++++++++++++++++++++++
 5 files changed, 528 insertions(+), 14 deletions(-)

diff --git a/src/message.c b/src/message.c
index 36d7d09..1b3363e 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1627,7 +1627,6 @@ void qd_message_send(qd_message_t *in_msg,
     }
 
     buf = msg->cursor.buffer;
-    assert (buf);
 
     pn_session_t     *pns  = pn_link_session(pnl);
 
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index b186c09..7b1af31 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -163,11 +163,21 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
         while (credit > 0) {
             sys_mutex_lock(conn->work_lock);
             dlv = DEQ_HEAD(link->undelivered);
-            sys_mutex_unlock(conn->work_lock);
             if (dlv) {
-                settled = dlv->settled;
-                uint64_t new_disp = core->deliver_handler(core->user_context, link, dlv, settled);
-                sys_mutex_lock(conn->work_lock);
+                uint64_t new_disp = 0;
+                // DISPATCH-1302 race hack fix: There is a race between the CORE thread
+                // and the outbound (this) thread over settlement. It occurs when the CORE
+                // thread is trying to propagate settlement to a peer (this delivery)
+                // while this thread is in core->deliver_handler.  This can result in the
+                // CORE thread NOT pushing the peer delivery change since it is not yet off of
+                // the undelivered list, while this thread does not settle because it missed
+                // the settled flag update.
+                do {
+                    settled = dlv->settled;
+                    sys_mutex_unlock(conn->work_lock);
+                    new_disp = core->deliver_handler(core->user_context, link, dlv, settled);
+                    sys_mutex_lock(conn->work_lock);
+                } while (settled != dlv->settled);  // oops missed the settlement
                 send_complete = qdr_delivery_send_complete(dlv);
                 if (send_complete) {
                     //
@@ -179,13 +189,16 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
                     credit--;
                     link->credit_to_core--;
                     link->total_deliveries++;
-                    offer = DEQ_SIZE(link->undelivered);
 
+                    // DISPATCH-1153:
+                    // If the undelivered list is cleared the link may have detached.  Stop processing.
+                    offer = DEQ_SIZE(link->undelivered);
                     if (offer == 0) {
                         sys_mutex_unlock(conn->work_lock);
                         return num_deliveries_completed;
                     }
 
+                    assert(dlv == DEQ_HEAD(link->undelivered));
                     DEQ_REMOVE_HEAD(link->undelivered);
                     dlv->link_work = 0;
 
@@ -223,8 +236,10 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
                 if (new_disp)
                     qdr_delivery_update_disposition(((qd_router_t *)core->user_context)->router_core,
                                                     dlv, new_disp, true, 0, 0, false);
-            } else
+            } else {
+                sys_mutex_unlock(conn->work_lock);
                 break;
+            }
         }
 
         if (offer != -1)
diff --git a/src/router_node.c b/src/router_node.c
index 3639ddb..2114ce8 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1523,17 +1523,16 @@ static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_
         //
         // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
         //
-        if (!settled && !remote_snd_settled) {
-            if (qdr_delivery_get_context(dlv) == 0)
-                qdr_node_connect_deliveries(qlink, dlv, pdlv);
-        }
+        if (qdr_delivery_get_context(dlv) == 0)
+            qdr_node_connect_deliveries(qlink, dlv, pdlv);
 
         qdr_delivery_set_tag_sent(dlv, true);
+    } else {
+        pdlv = qdr_node_delivery_pn_from_qdr(dlv);
     }
 
-    if (!pdlv) {
-        pdlv = pn_link_current(plink);
-    }
+    if (!pdlv)
+        return 0;
 
     bool restart_rx = false;
     bool q3_stalled = false;
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index e3e5cb0..e76c81c 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -127,6 +127,7 @@ foreach(py_test_module
     system_tests_core_client
     system_tests_address_lookup
     system_tests_multi_phase
+    system_tests_multicast
     )
 
   add_test(${py_test_module} ${TEST_WRAP} unit2 -v ${py_test_module})
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
new file mode 100644
index 0000000..c82ec18
--- /dev/null
+++ b/tests/system_tests_multicast.py
@@ -0,0 +1,500 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Test the multicast forwarder
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+from time import sleep
+import unittest2 as unittest
+
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from proton.reactor import LinkOption
+from proton import Connection
+from proton import Link
+from proton import Message
+from proton import Delivery
+from qpid_dispatch.management.client import Node
+from system_test import TestCase
+from system_test import Qdrouterd
+from system_test import main_module
+from system_test import TIMEOUT
+
+
+class TestTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+MAX_FRAME=1025
+W_THREADS=2
+
+# check for leaks of the following entities
+ALLOC_STATS=["qd_message_t",
+             "qd_buffer_t",
+             "qdr_delivery_t"]
+
+class MulticastLinearTest(TestCase):
+    """
+    Verify the multicast forwarding logic across a multihop linear router
+    configuration
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(MulticastLinearTest, cls).setUpClass()
+
+        def router(name, mode, extra):
+            config = [
+                ('router', {'mode': mode,
+                            'id': name,
+                            'allowUnsettledMulticast': 'yes',
+                            'workerThreads': W_THREADS}),
+                ('listener', {'role': 'normal',
+                              'port': cls.tester.get_port(),
+                              'maxFrameSize': MAX_FRAME}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+            ]
+
+            if extra:
+                config.extend(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+            return cls.routers[-1]
+
+        # configuration:
+        # two edge routers connected via 2 interior routers.
+        #
+        #  +-------+    +---------+    +---------+    +-------+
+        #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |
+        #  +-------+    +---------+    +---------+    +-------+
+        #
+        # Each router has 2 multicast consumers
+        # EA1 and INT.A each have a multicast sender
+
+        cls.routers = []
+
+        interrouter_port = cls.tester.get_port()
+        cls.INTA_edge_port   = cls.tester.get_port()
+        cls.INTB_edge_port   = cls.tester.get_port()
+
+        router('INT.A', 'interior',
+               [('listener', {'role': 'inter-router',
+                              'port': interrouter_port}),
+                ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})])
+        cls.INT_A = cls.routers[0]
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+
+        router('INT.B', 'interior',
+               [('connector', {'name': 'connectorToA',
+                               'role': 'inter-router',
+                               'port': interrouter_port}),
+                ('listener', {'role': 'edge',
+                              'port': cls.INTB_edge_port})])
+
+        cls.INT_B = cls.routers[1]
+        cls.INT_B.listener = cls.INT_B.addresses[0]
+
+        router('EA1', 'edge',
+               [('listener', {'name': 'rc', 'role': 'route-container',
+                              'port': cls.tester.get_port()}),
+                ('connector', {'name': 'uplink', 'role': 'edge',
+                               'port': cls.INTA_edge_port}),
+                ('linkRoute', {'prefix': 'CfgLinkRoute1', 'containerId': 'FakeBroker', 'direction': 'in'}),
+                ('linkRoute', {'prefix': 'CfgLinkRoute1', 'containerId': 'FakeBroker', 'direction': 'out'})])
+        cls.EA1 = cls.routers[2]
+        cls.EA1.listener = cls.EA1.addresses[0]
+        cls.EA1.route_container = cls.EA1.addresses[1]
+
+        router('EB1', 'edge',
+               [('connector', {'name': 'uplink',
+                               'role': 'edge',
+                               'port': cls.INTB_edge_port,
+                               'maxFrameSize': 1024}),
+                ('listener', {'name': 'rc', 'role': 'route-container',
+                              'port': cls.tester.get_port()}),
+                ('linkRoute', {'pattern': '*.cfg.pattern.#', 'containerId': 'FakeBroker', 'direction': 'in'}),
+                ('linkRoute', {'pattern': '*.cfg.pattern.#', 'containerId': 'FakeBroker', 'direction': 'out'})])
+        cls.EB1 = cls.routers[3]
+        cls.EB1.listener = cls.EB1.addresses[0]
+        cls.EB1.route_container = cls.EB1.addresses[1]
+
+        cls.INT_A.wait_router_connected('INT.B')
+        cls.INT_B.wait_router_connected('INT.A')
+        cls.EA1.wait_connectors()
+        cls.EB1.wait_connectors()
+
+        # Client topology:
+        # all routes have 2 receivers
+        # Edge router EA1 and interior INT_A have a sender each
+        #
+        cls.config = [
+            # edge router EA1:
+            {'router':      cls.EA1,
+             'senders':     ['S-EA1-1'],
+             'receivers':   [],
+             'subscribers': 1,
+             'remotes':     0
+            },
+            # Interior router INT_A:
+            {'router':      cls.INT_A,
+             'senders':     [],
+             # 'receivers':   ['R-INT_A-1'],
+             'receivers':   [],
+             'subscribers': 0,
+             'remotes':     1,
+            },
+            # Interior router INT_B:
+            {'router':      cls.INT_B,
+             'senders':     [],
+             'receivers':   [],
+             'subscribers': 1,
+             'remotes':     0,
+            },
+            # edge router EB1
+            {'router':      cls.EB1,
+             'senders':     [],
+             'receivers':   ['R-EB1-1'],
+             'subscribers': 1,
+             'remotes':     0,
+            }
+        ]
+
+
+    def _get_alloc_stats(self, router, stats):
+        # return a map of the current allocator counters for each entity type
+        # name in stats
+
+        #
+        # 57: END = [{u'heldByThreads': int32(384), u'typeSize': int32(536),
+        # u'transferBatchSize': int32(64), u'globalFreeListMax': int32(0),
+        # u'batchesRebalancedToGlobal': int32(774), u'typeName':
+        # u'qd_buffer_t', u'batchesRebalancedToThreads': int32(736),
+        # u'totalFreeToHeap': int32(0), u'totalAllocFromHeap': int32(2816),
+        # u'localFreeListMax': int32(128), u'type':
+        # u'org.apache.qpid.dispatch.allocator', u'identity':
+        # u'allocator/qd_buffer_t', u'name': u'allocator/qd_buffer_t'}]
+
+        d = dict()
+        mgmt = router.management
+        atype = 'org.apache.qpid.dispatch.allocator'
+        q = mgmt.query(type=atype).get_dicts()
+        for name in stats:
+            d[name] = filter(lambda a: a['typeName'] == name, q)[0]
+        return d
+
+    def test_51_maybe_presettled_large_msg(self):
+        body = " MCAST MAYBE PRESETTLED LARGE "
+        body += "X" * (MAX_FRAME * 19)
+        for repeat in range(5):
+            test = MulticastPresettled(self.config, 100, body, SendMaybePresettled())
+            test.run()
+            self.assertEqual(None, test.error)
+
+    def test_51_presettled_large_msg(self):
+        body = " MCAST PRESETTLED LARGE "
+        body += "X" * (MAX_FRAME * 23)
+        for repeat in range(5):
+            test = MulticastPresettled(self.config, 100, body, SendMustBePresettled())
+            test.run()
+            self.assertEqual(None, test.error)
+
+    def _check_for_leaks(self):
+        for r in self.routers:
+            stats = self._get_alloc_stats(r, ALLOC_STATS)
+            for name in ALLOC_STATS:
+                # ensure threads haven't leaked
+                max_allowed  = ((W_THREADS + 1)
+                                * stats[name]['localFreeListMax'])
+                held = stats[name]['heldByThreads']
+                import sys; sys.stdout.flush()
+                if held >= (2 * max_allowed):
+                    print("OOPS!!! %s: (%s) - held=%d max=%d\n   %s\n"
+                          % (r.config.router_id,
+                             name, held, max_allowed, stats))
+                    import sys; sys.stdout.flush()
+                    self.assertFalse(held >= (2 * max_allowed))
+
+    def test_999_check_for_leaks(self):
+        self._check_for_leaks()
+
+
+class SendMaybePresettled(LinkOption):
+    """
+    Set the default send settlement modes on link negotiation to mixed
+    """
+    def apply(self, link):
+        link.snd_settle_mode = Link.SND_MIXED
+        link.rcv_settle_mode = Link.RCV_FIRST
+
+
+class SendMustBePresettled(LinkOption):
+    """
+    Set the default send settlement modes on a link to presettled
+    """
+    def apply(self, link):
+        link.snd_settle_mode = Link.SND_SETTLED
+        link.rcv_settle_mode = Link.RCV_FIRST
+
+
+class MulticastBase(MessagingHandler):
+    def __init__(self, config, count, body, topic=None, **handler_kwargs):
+        super(MulticastBase, self).__init__(**handler_kwargs)
+        self.msg_count = count
+        self.config = config
+        self.topic = topic or "whatevahcast/test"
+        self.body = body
+
+        # totals
+        self.n_senders = 0;
+        self.n_receivers = 0;
+        self.n_sent = 0
+        self.n_received = 0
+        self.n_settled = 0
+        self.n_accepted = 0
+        self.n_released = 0
+        self.n_rejected = 0
+        self.n_modified = 0
+        self.n_partial = 0
+
+        # all maps indexed by client name:
+        self.receivers = {}
+        self.senders = {}
+        self.r_conns = {}
+        self.s_conns = {}
+
+        # per receiver
+        self.c_received = {}
+
+        # count per outcome
+        self.n_outcomes = {}
+
+        # self.c_accepted = {}
+        # self.c_released = {}
+        # self.c_rejected = {}
+        # self.c_modified = {}
+        # self.c_settled  = {}
+
+        self.error = None
+        self.timers = []
+        self.reactor = None
+
+    def done(self):
+        # stop the reactor and clean up the test
+        for t in self.timers:
+            t.cancel()
+        for c_dict in [self.r_conns, self.s_conns]:
+            for conn in c_dict.values():
+                conn.close()
+        self.r_conns = {}
+        self.s_conns = {}
+
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.done()
+
+    def create_receiver(self, container, conn, source, name):
+        # must override in subclass
+        assert(False)
+
+    def create_sender(self, container, conn, target, name):
+        # must override in subclass
+        assert(False)
+
+    def on_start(self, event):
+        self.reactor = event.reactor
+        self.timers.append(self.reactor.schedule(TIMEOUT, TestTimeout(self)))
+        # first create all the receivers first
+        for cfg in self.config:
+            for name in cfg['receivers']:
+                conn = event.container.connect(cfg['router'].listener)
+                assert(name not in self.r_conns)
+                self.r_conns[name] = conn
+                self.create_receiver(event.container, conn, self.topic, name)
+                self.n_receivers += 1
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            r_name = event.receiver.name
+            self.receivers[r_name] = event.receiver
+            # create senders after all receivers are opened
+            # makes it easy to check when the clients are ready
+            if len(self.receivers) == self.n_receivers:
+                for cfg in self.config:
+                    for name in cfg['senders']:
+                        conn = event.container.connect(cfg['router'].listener)
+                        assert(name not in self.s_conns)
+                        self.s_conns[name] = conn
+                        self.create_sender(event.container, conn, self.topic, name)
+                        self.n_senders += 1
+
+    def on_sendable(self, event):
+        s_name = event.sender.name
+        if s_name not in self.senders:
+            self.senders[s_name] = event.sender
+            if len(self.senders) == self.n_senders:
+                # all senders ready to send, now wait until the routes settle
+                for cfg in self.config:
+                    cfg['router'].wait_address(self.topic,
+                                               subscribers=cfg['subscribers'],
+                                               remotes=cfg['remotes'])
+                for sender in self.senders.values():
+                    self.do_send(sender)
+
+    def on_message(self, event):
+        if event.delivery.partial:
+            self.n_partial += 1
+        else:
+            dlv = event.delivery
+            self.n_received += 1
+            name = event.link.name
+            self.c_received[name] = 1 + self.c_received.get(name, 0)
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+        name = event.link.name
+        self.n_outcomes[Delivery.ACCEPTED] = 1 + self.n_outcomes.get(Delivery.ACCEPTED, 0)
+
+    def on_released(self, event):
+        # for some reason Proton 'helpfully' calls on_released even though the
+        # delivery state is actually MODIFIED
+        if event.delivery.remote_state == Delivery.MODIFIED:
+            return self.on_modified(event)
+        self.n_released += 1
+        name = event.link.name
+        self.n_outcomes[Delivery.RELEASED] = 1 + self.n_outcomes.get(Delivery.RELEASED, 0)
+
+    def on_modified(self, event):
+        self.n_modified += 1
+        name = event.link.name
+        self.n_outcomes[Delivery.MODIFIED] = 1 + self.n_outcomes.get(Delivery.MODIFIED, 0)
+
+    def on_rejected(self, event):
+        self.n_rejected += 1
+        name = event.link.name
+        self.n_outcomes[Delivery.REJECTED] = 1 + self.n_outcomes.get(Delivery.REJECTED, 0)
+
+    def on_settled(self, event):
+        self.n_settled += 1
+        name = event.link.name
+
+    def run(self):
+        Container(self).run()
+
+        # wait until all routers have cleaned up the route tables
+        clean = False
+        while not clean:
+            clean = True
+            for cfg in self.config:
+                mgmt = cfg['router'].management
+                atype = 'org.apache.qpid.dispatch.router.address'
+                addrs = mgmt.query(type=atype).get_dicts()
+                if list(filter(lambda a: a['name'].find(self.topic) != -1, addrs)):
+                    clean = False
+                    break
+            if not clean:
+                sleep(0.1)
+
+
+class MulticastPresettled(MulticastBase):
+    """
+    Test multicast forwarding for presettled transfers.
+    Verifies that all messages are settled by the sender
+    """
+    def __init__(self, config, count, body, settlement_mode):
+        # use a large prefetch to prevent drops
+        super(MulticastPresettled, self).__init__(config,
+                                                  count,
+                                                  body,
+                                                  prefetch=(count * 1024),
+                                                  auto_accept=False,
+                                                  auto_settle=False)
+        self.settlement_mode = settlement_mode
+        self.unexpected_unsettled = 0
+        self.expected_settled = 0
+        self.sender_settled = 0
+        self.done_count = 0
+        self.unsettled_deliveries = dict()
+
+    def create_receiver(self, container, conn, source, name):
+        return container.create_receiver(conn, source=source, name=name,
+                                         options=self.settlement_mode)
+
+    def create_sender(self, container, conn, target, name):
+        return container.create_sender(conn, target=target, name=name,
+                                       options=self.settlement_mode)
+
+    def do_send(self, sender):
+        for i in range(self.msg_count):
+            msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body))
+            dlv = sender.send(msg)
+            # settled before sending out the message
+            dlv.settle()
+            self.n_sent += 1
+
+    def check_if_done(self):
+        # wait for all present receivers to receive all messages
+        # and for all received messagest to be settled by the
+        # sender
+        to_rcv = self.n_senders * self.msg_count
+        if to_rcv == self.n_received and not self.unsettled_deliveries:
+            self.done()
+
+    def on_message(self, event):
+        super(MulticastPresettled, self).on_message(event)
+        if event.receiver:
+            if not event.delivery.settled:
+                event.delivery.update(Delivery.ACCEPTED)
+                self.unexpected_unsettled += 1
+                tag = str(event.delivery.tag)
+                if tag not in self.unsettled_deliveries:
+                    self.unsettled_deliveries[tag] = 1
+                else:
+                    self.unsettled_deliveries[tag] += 1
+            else:
+                self.expected_settled += 1
+            event.receiver.flow(100)
+        self.check_if_done()
+
+    def on_settled(self, event):
+        super(MulticastPresettled, self).on_settled(event)
+        if event.receiver:
+            self.sender_settled += 1
+            tag = str(event.delivery.tag)
+            try:
+                self.unsettled_deliveries[tag] -= 1
+                if self.unsettled_deliveries[tag] == 0:
+                    del self.unsettled_deliveries[tag]
+            except KeyError:
+                pass
+            self.check_if_done()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org