You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/10/16 15:24:26 UTC
qpid-dispatch git commit: DISPATCH-1145 - Implemented responsibility
(3). Deliveries with Mobile addresses now flow from interior to edge.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 94be97fa5 -> 0138754e5
DISPATCH-1145 - Implemented responsibility (3). Deliveries with Mobile addresses now flow from interior to edge.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0138754e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0138754e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0138754e
Branch: refs/heads/master
Commit: 0138754e5811e761a68024f5c00af13e8d153fd5
Parents: 94be97f
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Oct 16 11:23:18 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Oct 16 11:23:18 2018 -0400
----------------------------------------------------------------------
.../modules/edge_router/addr_proxy.c | 22 ++++++++++++++
src/router_core/router_core.c | 19 +++++++------
src/router_core/router_core_private.h | 1 +
tests/system_tests_edge_router.py | 30 ++++++++++++++------
4 files changed, 56 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/modules/edge_router/addr_proxy.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index d5dc237..34d5697 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -41,6 +41,7 @@ struct qcm_edge_addr_proxy_t {
qdrc_event_subscription_t *event_sub;
bool uplink_established;
qdr_address_t *uplink_addr;
+ qdr_connection_t *uplink_conn;
};
@@ -54,6 +55,15 @@ static qdr_terminus_t *qdr_terminus_edge_downlink(const char *addr)
}
+static qdr_terminus_t *qdr_terminus_normal(const char *addr)
+{
+ qdr_terminus_t *term = qdr_terminus(0);
+ if (addr)
+ qdr_terminus_set_address(term, addr);
+ return term;
+}
+
+
static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *conn)
{
qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context;
@@ -64,6 +74,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
// Flag the uplink as being established.
//
ap->uplink_established = true;
+ ap->uplink_conn = conn;
//
// Attach an anonymous sending link to the interior router.
@@ -96,6 +107,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
case QDRC_EVENT_CONN_EDGE_LOST :
ap->uplink_established = false;
+ ap->uplink_conn = 0;
break;
default:
@@ -108,6 +120,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
{
qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context;
+ qdr_link_t *link;
//
// If we don't have an established uplink, there is no further work to be done.
@@ -124,9 +137,18 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr
switch (event) {
case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST :
+ link = qdr_create_link_CT(ap->core, ap->uplink_conn, QD_LINK_ENDPOINT, QD_INCOMING,
+ qdr_terminus_normal(key + 2), qdr_terminus_normal(0));
+ qdr_core_bind_address_link_CT(ap->core, addr, link);
+ addr->edge_inlink = link;
+
break;
case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST :
+ link = addr->edge_inlink;
+ qdr_core_unbind_address_link_CT(ap->core, addr, link);
+ qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true);
+
break;
default:
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index f7ddb5b..b984532 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -429,16 +429,19 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li
void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link)
{
- qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
link->owning_addr = 0;
- if (DEQ_SIZE(addr->rlinks) == 0) {
- const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
- if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE || *key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY))
- qdr_post_mobile_removed_CT(core, key);
- qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr);
- } else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0)
- qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
+ if (link->link_direction == QD_OUTGOING) {
+ qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
+ if (DEQ_SIZE(addr->rlinks) == 0) {
+ const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE || *key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY))
+ qdr_post_mobile_removed_CT(core, key);
+ qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr);
+ } else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0)
+ qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr);
+ } else
+ qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c027dc4..45f4437 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -477,6 +477,7 @@ struct qdr_address_t {
qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
qdrc_endpoint_desc_t *core_endpoint; ///< [ref] Set if this address is bound to an in-core endpoint
void *core_endpoint_context;
+ qdr_link_t *edge_inlink; ///< [ref] In-link from connected Interior router (on edge router)
qd_address_treatment_t treatment;
qdr_forwarder_t *forwarder;
int ref_count; ///< Number of link-routes + auto-links referencing this address
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 803ba6d..b489e2b 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -38,10 +38,10 @@ class RouterTest(TestCase):
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
- ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'in', 'containerId': 'LRC'}),
- ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'out', 'containerId': 'LRC'}),
- ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'in'}),
- ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'out'}),
+ ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'in', 'containerId': 'LRC'}),
+ ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'out', 'containerId': 'LRC'}),
+ ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'in'}),
+ ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'out'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'spread', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
@@ -139,7 +139,6 @@ class RouterTest(TestCase):
self.assertEqual(None, test.error)
def test_11_mobile_address_interior_to_edge(self):
- self.skipTest("Temporarily disabled")
test = MobileAddressTest(self.routers[2].addresses[0],
self.routers[0].addresses[0],
"test_11")
@@ -335,13 +334,17 @@ class MobileAddressTest(MessagingHandler):
self.receiver_conn = None
self.sender_conn = None
self.receiver = None
- self.count = 10
+ self.count = 300
+ self.rel_count = 50
self.n_rcvd = 0
self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
self.error = None
def timeout(self):
- self.error = "Timeout Expired - n_sent=%d n_rcvd=%d addr=%s" % (self.n_sent, self.n_rcvd, self.address)
+ self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \
+ (self.n_sent, self.n_rcvd, self.n_settled, self.n_released, self.address)
self.receiver_conn.close()
self.sender_conn.close()
@@ -359,7 +362,18 @@ class MobileAddressTest(MessagingHandler):
def on_message(self, event):
self.n_rcvd += 1
- if self.n_rcvd == self.count:
+
+ def on_settled(self, event):
+ self.n_settled += 1
+ if self.n_settled == self.count:
+ self.receiver.close()
+ for i in range(self.rel_count):
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_released(self, event):
+ self.n_released += 1
+ if self.n_released == self.rel_count:
self.receiver_conn.close()
self.sender_conn.close()
self.timer.cancel()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org