You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/05/29 20:28:17 UTC

qpid-dispatch git commit: DISPATCH-1012 - For anycast addresses, deliveries to an address that has no attached destinations shall be released by the router network and credit not replenished to the sender. Once a destination becomes reachable, the sende

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master c728d7f41 -> ab4421db6


DISPATCH-1012 - For anycast addresses, deliveries to an address that has no attached destinations shall be released by the router network and credit not replenished to the sender.  Once a destination becomes reachable, the sender's credit shall be topped back up to the original capacity.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ab4421db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ab4421db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ab4421db

Branch: refs/heads/master
Commit: ab4421db6b1371e5ba902bc441ccc332352e7516
Parents: c728d7f
Author: Ted Ross <tr...@redhat.com>
Authored: Tue May 29 16:25:00 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue May 29 16:25:00 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         |   2 +
 src/router_core/router_core_private.h |   2 +-
 src/router_core/transfer.c            |  56 ++++++------
 tests/system_tests_one_router.py      | 132 ++++++++++++++++++-----------
 4 files changed, 114 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 0e655d3..5fdc3bf 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -442,6 +442,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     strcpy(link->name, name);
     link->link_direction = dir;
     link->capacity       = conn->link_capacity;
+    link->credit_pending = conn->link_capacity;
     link->admin_enabled  = true;
     link->oper_status    = QDR_LINK_OPER_DOWN;
 
@@ -867,6 +868,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     link->link_type      = link_type;
     link->link_direction = dir;
     link->capacity       = conn->link_capacity;
+    link->credit_pending = conn->link_capacity;
     link->name           = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
     link->disambiguated_name = 0;
     link->terminus_addr  = 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index b3c9798..8017c67 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -398,10 +398,10 @@ struct qdr_link_t {
     qdr_link_oper_status_t   oper_status;
     int                      capacity;
     int                      credit_to_core; ///< Number of the available credits incrementally given to the core
+    int                      credit_pending; ///< Number of credits to be issued once consumers are available
     bool                     admin_enabled;
     bool                     strip_annotations_in;
     bool                     strip_annotations_out;
-    bool                     flow_started;   ///< for incoming, true iff initial credit has been granted
     bool                     drain_mode;
     bool                     stalled_outbound;  ///< Indicates that this link is stalled on outbound buffer backpressure
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 706a26d..d92cc2b 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -795,32 +795,30 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         //
         // We are trying to forward a delivery on an address that has no outbound paths
         // AND the incoming link is targeted (not anonymous).
-
-        // If the owning_addr is a multicast addr and there are no outbound paths, we will release this delivery
-        // and replenish the credit.
-
-        // For non multicast addresses, put the delivery on the incoming link's undelivered list.  Note that it is safe
-        // to do this because the undelivered list will be flushed once the number of
-        // paths transitions from zero to one.
         //
-        // Use the action-reference as the reference for undelivered rather
-        // than decrementing and incrementing the delivery ref_count.
+        // We shall release the delivery (it is currently undeliverable).  If the distribution is
+        // multicast, we will replenish the credit.  If it is anycast, we will allow the credit to
+        // drain.
         //
-        if (qdr_is_addr_treatment_multicast(link->owning_addr)) {
+        if (dlv->settled) {
+            // Increment the presettled_dropped_deliveries on the in_link
+            link->dropped_presettled_deliveries++;
+            core->dropped_presettled_deliveries++;
+        } else
             qdr_delivery_release_CT(core, dlv);
+
+        if (qdr_is_addr_treatment_multicast(link->owning_addr))
             qdr_link_issue_credit_CT(core, link, 1, false);
-            qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
-            if (dlv->settled) {
-                // Increment the presettled_dropped_deliveries on the in_link
-                link->dropped_presettled_deliveries++;
-                core->dropped_presettled_deliveries++;
-            }
-        }
-        else {
-            DEQ_INSERT_TAIL(link->undelivered, dlv);
-            dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
-            qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer:  dlv:%lx qdr_link_forward_CT: action-list -> undelivered-list", (long) dlv);
-        }
+        else
+            link->credit_pending++;
+
+        //
+        // Set the discard flag on the message only if the message is not completely received yet.
+        //
+        if (!receive_complete)
+            qd_message_set_discard(dlv->msg, true);
+
+        qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
         return;
     }
 
@@ -834,7 +832,6 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
             if (qdr_connection_route_container(link->conn)) {
                 addr->deliveries_ingress_route_container++;
                 core->deliveries_ingress_route_container++;
-
             }
 
         }
@@ -1151,7 +1148,8 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
 
     qdr_deliver_continue_peers_CT(core, in_dlv);
 
-    if (qd_message_receive_complete(qdr_delivery_message(in_dlv))) {
+    qd_message_t *msg = qdr_delivery_message(in_dlv);
+    if (qd_message_receive_complete(msg) && !qd_message_is_discard(msg)) {
         //
         // The entire message has now been received. Check to see if there are in process subscriptions that need to
         // receive this message. in process subscriptions, at this time, can deal only with full messages.
@@ -1203,12 +1201,12 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo
     bool drain_changed = link->drain_mode |= drain;
     link->drain_mode   = drain;
 
+    if (link->credit_pending > 0)
+        link->credit_pending = link->credit_pending > credit ? link->credit_pending - credit : 0;
+
     if (!drain_changed && credit == 0)
         return;
 
-    if (credit > 0)
-        link->flow_started = true;
-
     qdr_link_work_t *work = new_qdr_link_work_t();
     ZERO(work);
 
@@ -1271,8 +1269,8 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
             //
             // Issue credit to stalled links
             //
-            if (!link->flow_started)
-                qdr_link_issue_credit_CT(core, link, link->capacity, false);
+            if (link->credit_pending > 0)
+                qdr_link_issue_credit_CT(core, link, link->credit_pending, false);
 
             //
             // Drain undelivered deliveries via the forwarder

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index ea9f3cf..3664acf 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -283,7 +283,7 @@ class OneRouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
-    def test_26_multicast_no_receivcer(self):
+    def test_26_multicast_no_receiver(self):
         test = MulticastUnsettledNoReceiverTest(self.address)
         test.run()
         self.assertEqual(None, test.error)
@@ -410,6 +410,51 @@ class OneRouterTest(TestCase):
         client.connection.close()
 
 
+class Entity(object):
+    def __init__(self, status_code, status_description, attrs):
+        self.status_code        = status_code
+        self.status_description = status_description
+        self.attrs              = attrs
+
+    def __getattr__(self, key):
+        return self.attrs[key]
+
+
+class RouterProxy(object):
+    def __init__(self, reply_addr):
+        self.reply_addr = reply_addr
+
+    def response(self, msg):
+        ap = msg.properties
+        bd = msg.body
+        if bd.__class__ == dict and 'results' in bd and 'attributeNames' in bd:
+            ##
+            ## This is a query response
+            ##
+            response = []
+            anames = bd['attributeNames']
+            for row in bd['results']:
+                cols = {}
+                for i in range(len(row)):
+                    cols[anames[i]] = row[i]
+                response.append(Entity(ap['statusCode'], ap['statusDescription'], cols))
+            return response
+
+        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
+
+    def read_address(self, name):
+        ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_addresses(self):
+        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_links(self):
+        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.link'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+
 class SemanticsClosest(MessagingHandler):
     def __init__(self, address):
         super(SemanticsClosest, self).__init__()
@@ -2282,10 +2327,10 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler):
     these messages since there is no receiver.
     """
     def __init__(self, address):
-        super(MulticastUnsettledNoReceiverTest, self).__init__(prefetch=0)
+        super(MulticastUnsettledNoReceiverTest, self).__init__()
         self.address = address
         self.dest = "multicast.MulticastNoReceiverTest"
-        self.error = "Some error"
+        self.error = None
         self.n_sent = 0
         self.max_send = 250
         self.n_released = 0
@@ -2293,55 +2338,59 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler):
         self.timer = None
         self.conn = None
         self.sender = None
+        self.query_sent = False
+
+    def timeout(self):
+        self.error = "Timeout expired: n_sent=%d n_released=%d n_accepted=%d" % \
+                     (self.n_sent, self.n_released, self.n_accepted)
+        self.conn.close()
 
     def check_if_done(self):
         if self.n_accepted > 0:
             self.error = "Messages should not be accepted as there are no receivers"
             self.timer.cancel()
             self.conn.close()
-        elif self.n_sent == self.n_released:
-            self.error = None
-
-            if not self.error:
-                local_node = Node.connect(self.address, timeout=TIMEOUT)
-                for result in local_node.query(type='org.apache.qpid.dispatch.router.link').results:
-                    if result[5] == 'in' and 'multicast.MulticastNoReceiverTest' in result[6]:
-                        if result[16] != 250:
-                            self.error = "Expected 250 dropped presettled deliveries but got " + str(result[16])
-                        else:
-                            outs = local_node.query(type='org.apache.qpid.dispatch.router')
-                            pos = outs.attribute_names.index("droppedPresettledDeliveries")
-                            results = outs.results[0]
-                            if results[pos] != 250:
-                                self.error = "When querying router, expected 250 dropped presettled " \
-                                             "deliveries but got " + str(results[pos])
-                            else:
-                                pos = outs.attribute_names.index("releasedDeliveries")
-                                if results[pos] < 250:
-                                    self.error = "The number of released deliveries cannot be less that 250 " \
-                                                 "but it is " + str(results[pos])
-
-            self.timer.cancel()
-            self.conn.close()
+        elif self.max_send == self.n_released and not self.query_sent:
+            self.mgmt_tx.send(self.proxy.query_links())
+            self.query_sent = True
 
     def on_start(self, event):
         self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
         self.conn = event.container.connect(self.address)
-        self.sender = event.container.create_sender(self.conn, self.dest)
+        self.mgmt_rx = event.container.create_receiver(self.conn, dynamic=True)
+        self.mgmt_tx = event.container.create_sender(self.conn, '$management')
+
+    def on_link_opened(self, event):
+        if event.receiver == self.mgmt_rx:
+            self.proxy  = RouterProxy(self.mgmt_rx.remote_source.address)
+            self.sender = event.container.create_sender(self.conn, self.dest)
+
+    def on_message(self, event):
+        if event.receiver == self.mgmt_rx:
+            results = self.proxy.response(event.message)
+            for link in results:
+                if link.linkDir == 'in' and link.owningAddr == 'M0' + self.dest:
+                    if link.releasedCount != self.max_send:
+                        self.error = "Released count expected %d, got %d" % (self.max_send, link.droppedPresettledCount)
+            self.timer.cancel()
+            self.conn.close()
 
     def on_sendable(self, event):
-        if self.n_sent >= self.max_send:
-            return
-        self.n_sent += 1
-        msg = Message(body=self.n_sent)
-        event.sender.send(msg)
+        if event.sender == self.sender:
+            if self.n_sent >= self.max_send:
+                return
+            self.n_sent += 1
+            msg = Message(body=self.n_sent)
+            event.sender.send(msg)
 
     def on_accepted(self, event):
-        self.n_accepted += 1
+        if event.sender == self.sender:
+            self.n_accepted += 1
         self.check_if_done()
 
     def on_released(self, event):
-        self.n_released += 1
+        if event.sender == self.sender:
+            self.n_released += 1
         self.check_if_done()
 
     def run(self):
@@ -2638,19 +2687,6 @@ class PresettledOverflowTest(MessagingHandler):
                     if result[5] == 'out' and 'balanced.PresettledOverflow' in result[6]:
                         if result[16] != 250:
                             self.error = "Expected 250 dropped presettled deliveries but got " + str(result[16])
-                        else:
-                            outs = local_node.query(type='org.apache.qpid.dispatch.router')
-                            pos_presett = outs.attribute_names.index("presettledDeliveries")
-                            pos = outs.attribute_names.index("droppedPresettledDeliveries")
-                            results = outs.results[0]
-                            # Enforce presettledDeliveries metric is updated
-                            if results[pos_presett] < 500:
-                                self.error = "When querying router, expected 500 presettled " \
-                                             "deliveries but got " + str(results[pos_presett])
-                            # There is 250 from a previous test
-                            if results[pos] < 500:
-                                self.error = "When querying router, expected 500 dropped presettled " \
-                                             "deliveries but got " + str(results[pos])
             self.conn.close()
             self.timer.cancel()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org