You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/10/16 15:24:26 UTC

qpid-dispatch git commit: DISPATCH-1145 - Implemented responsibility (3). Deliveries with Mobile addresses now flow from interior to edge.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 94be97fa5 -> 0138754e5


DISPATCH-1145 - Implemented responsibility (3).  Deliveries with Mobile addresses now flow from interior to edge.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0138754e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0138754e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0138754e

Branch: refs/heads/master
Commit: 0138754e5811e761a68024f5c00af13e8d153fd5
Parents: 94be97f
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Oct 16 11:23:18 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Oct 16 11:23:18 2018 -0400

----------------------------------------------------------------------
 .../modules/edge_router/addr_proxy.c            | 22 ++++++++++++++
 src/router_core/router_core.c                   | 19 +++++++------
 src/router_core/router_core_private.h           |  1 +
 tests/system_tests_edge_router.py               | 30 ++++++++++++++------
 4 files changed, 56 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/modules/edge_router/addr_proxy.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index d5dc237..34d5697 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -41,6 +41,7 @@ struct qcm_edge_addr_proxy_t {
     qdrc_event_subscription_t *event_sub;
     bool                       uplink_established;
     qdr_address_t             *uplink_addr;
+    qdr_connection_t          *uplink_conn;
 };
 
 
@@ -54,6 +55,15 @@ static qdr_terminus_t *qdr_terminus_edge_downlink(const char *addr)
 }
 
 
+static qdr_terminus_t *qdr_terminus_normal(const char *addr)
+{
+    qdr_terminus_t *term = qdr_terminus(0);
+    if (addr)
+        qdr_terminus_set_address(term, addr);
+    return term;
+}
+
+
 static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *conn)
 {
     qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context;
@@ -64,6 +74,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
         // Flag the uplink as being established.
         //
         ap->uplink_established = true;
+        ap->uplink_conn        = conn;
 
         //
         // Attach an anonymous sending link to the interior router.
@@ -96,6 +107,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
 
     case QDRC_EVENT_CONN_EDGE_LOST :
         ap->uplink_established = false;
+        ap->uplink_conn        = 0;
         break;
 
     default:
@@ -108,6 +120,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
 static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
 {
     qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context;
+    qdr_link_t            *link;
 
     //
     // If we don't have an established uplink, there is no further work to be done.
@@ -124,9 +137,18 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr
 
     switch (event) {
     case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST :
+        link = qdr_create_link_CT(ap->core, ap->uplink_conn, QD_LINK_ENDPOINT, QD_INCOMING,
+                                  qdr_terminus_normal(key + 2), qdr_terminus_normal(0));
+        qdr_core_bind_address_link_CT(ap->core, addr, link);
+        addr->edge_inlink = link;
+
         break;
 
     case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST :
+        link = addr->edge_inlink;
+        qdr_core_unbind_address_link_CT(ap->core, addr, link);
+        qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true);
+
         break;
 
     default:

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index f7ddb5b..b984532 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -429,16 +429,19 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li
 
 void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link)
 {
-    qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
     link->owning_addr = 0;
 
-    if (DEQ_SIZE(addr->rlinks) == 0) {
-        const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
-        if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE || *key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY))
-            qdr_post_mobile_removed_CT(core, key);
-        qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr);
-    } else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0)
-        qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
+    if (link->link_direction == QD_OUTGOING) {
+        qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
+        if (DEQ_SIZE(addr->rlinks) == 0) {
+            const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+            if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE || *key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY))
+                qdr_post_mobile_removed_CT(core, key);
+            qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr);
+        } else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0)
+            qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
+    } else
+        qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c027dc4..45f4437 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -477,6 +477,7 @@ struct qdr_address_t {
     qd_hash_handle_t          *hash_handle;   ///< Linkage back to the hash table entry
     qdrc_endpoint_desc_t      *core_endpoint; ///< [ref] Set if this address is bound to an in-core endpoint
     void                      *core_endpoint_context;
+    qdr_link_t                *edge_inlink;   ///< [ref] In-link from connected Interior router (on edge router)
     qd_address_treatment_t     treatment;
     qdr_forwarder_t           *forwarder;
     int                        ref_count;     ///< Number of link-routes + auto-links referencing this address

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 803ba6d..b489e2b 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -38,10 +38,10 @@ class RouterTest(TestCase):
                 ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
                 ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
                 ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
-                ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'in', 'containerId': 'LRC'}),
-                ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'out', 'containerId': 'LRC'}),
-                ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'in'}),
-                ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'out'}),
+                ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'in', 'containerId': 'LRC'}),
+                ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'out', 'containerId': 'LRC'}),
+                ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'in'}),
+                ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'out'}),
                 ('address', {'prefix': 'closest', 'distribution': 'closest'}),
                 ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
                 ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
@@ -139,7 +139,6 @@ class RouterTest(TestCase):
         self.assertEqual(None, test.error)
 
     def test_11_mobile_address_interior_to_edge(self):
-        self.skipTest("Temporarily disabled")
         test = MobileAddressTest(self.routers[2].addresses[0],
                                  self.routers[0].addresses[0],
                                  "test_11")
@@ -335,13 +334,17 @@ class MobileAddressTest(MessagingHandler):
         self.receiver_conn = None
         self.sender_conn   = None
         self.receiver      = None
-        self.count         = 10
+        self.count         = 300
+        self.rel_count     = 50
         self.n_rcvd        = 0
         self.n_sent        = 0
+        self.n_settled     = 0
+        self.n_released    = 0
         self.error         = None
 
     def timeout(self):
-        self.error = "Timeout Expired - n_sent=%d n_rcvd=%d addr=%s" % (self.n_sent, self.n_rcvd, self.address)
+        self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \
+                     (self.n_sent, self.n_rcvd, self.n_settled, self.n_released, self.address)
         self.receiver_conn.close()
         self.sender_conn.close()
 
@@ -359,7 +362,18 @@ class MobileAddressTest(MessagingHandler):
 
     def on_message(self, event):
         self.n_rcvd += 1
-        if self.n_rcvd == self.count:
+
+    def on_settled(self, event):
+        self.n_settled += 1
+        if self.n_settled == self.count:
+            self.receiver.close()
+            for i in range(self.rel_count):
+                self.sender.send(Message(body="Message %d" % self.n_sent))
+                self.n_sent += 1
+
+    def on_released(self, event):
+        self.n_released += 1
+        if self.n_released == self.rel_count:
             self.receiver_conn.close()
             self.sender_conn.close()
             self.timer.cancel()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org