You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/29 01:22:54 UTC
qpid-dispatch git commit: DISPATCH-179 - system_tests_one_router now
passes - Fixed handling of delivery lists in links.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 4ace33e70 -> 19abbfc2c
DISPATCH-179 - system_tests_one_router now passes - Fixed handling of delivery lists in links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/19abbfc2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/19abbfc2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/19abbfc2
Branch: refs/heads/master
Commit: 19abbfc2c1bdba3ff2e84ab9c4b61f9a76bae794
Parents: 4ace33e
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Mar 28 19:22:10 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Mar 28 19:22:10 2016 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 8 +--
src/router_core/forwarder.c | 11 +--
src/router_core/router_core_private.h | 6 ++
src/router_core/transfer.c | 107 ++++++++++++++++-------------
tests/system_tests_one_router.py | 29 ++++----
5 files changed, 92 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index ee9ec04..597d858 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -442,7 +442,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
//
// Free the undelivered deliveries. If this is an incoming link, the
- // undelivereds can simply be desetroyed. If it's an outgoing link, the
+ // undelivereds can simply be destroyed. If it's an outgoing link, the
// undelivereds' peer deliveries need to be released.
//
qdr_delivery_t *dlv = DEQ_HEAD(undelivered);
@@ -466,10 +466,8 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
DEQ_REMOVE_HEAD(unsettled);
peer = dlv->peer;
qdr_delivery_free(dlv);
- if (peer) {
+ if (peer)
peer->peer = 0;
- qdr_delivery_remove_unsettled_CT(core, peer);
- }
dlv = DEQ_HEAD(unsettled);
}
@@ -479,7 +477,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
sys_mutex_lock(conn->work_lock);
qdr_del_link_ref(&conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
- qdr_del_link_ref(&conn->links_with_credit , link, QDR_LINK_LIST_CLASS_FLOW);
+ qdr_del_link_ref(&conn->links_with_credit, link, QDR_LINK_LIST_CLASS_FLOW);
sys_mutex_unlock(conn->work_lock);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 0b305d2..8e41995 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -71,23 +71,23 @@ static bool qdr_forward_attach_null_CT(qdr_core_t *core,
}
-qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg)
+qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg)
{
qdr_delivery_t *dlv = new_qdr_delivery_t();
ZERO(dlv);
dlv->link = link;
dlv->msg = qd_message_copy(msg);
- dlv->settled = !peer || peer->settled;
+ dlv->settled = !in_dlv || in_dlv->settled;
dlv->tag = core->next_tag++;
//
// Create peer linkage only if the delivery is not settled
//
if (!dlv->settled) {
- if (peer && peer->peer == 0) {
- dlv->peer = peer;
- peer->peer = dlv; // TODO - make this a back-list for multicast tracking
+ if (in_dlv && in_dlv->peer == 0) {
+ dlv->peer = in_dlv;
+ in_dlv->peer = dlv; // TODO - make this a back-list for multicast tracking
}
}
@@ -99,6 +99,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
{
sys_mutex_lock(link->conn->work_lock);
DEQ_INSERT_TAIL(link->undelivered, dlv);
+ dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
//
// If the link isn't already on the links_with_deliveries list, put it there.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index dbf3f58..29de0f6 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -187,6 +187,11 @@ struct qdr_router_ref_t {
ALLOC_DECLARE(qdr_router_ref_t);
DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
+typedef enum {
+ QDR_DELIVERY_NOWHERE = 0,
+ QDR_DELIVERY_IN_UNDELIVERED,
+ QDR_DELIVERY_IN_UNSETTLED
+} qdr_delivery_where_t;
struct qdr_delivery_t {
DEQ_LINKS(qdr_delivery_t);
@@ -198,6 +203,7 @@ struct qdr_delivery_t {
qd_field_iterator_t *origin;
uint64_t disposition;
bool settled;
+ qdr_delivery_where_t where;
uint64_t tag;
qd_bitmask_t *link_exclusion;
};
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index e348b3e..0a19444 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -105,8 +105,11 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
dlv = DEQ_HEAD(link->undelivered);
if (dlv) {
DEQ_REMOVE_HEAD(link->undelivered);
- if (!dlv->settled)
+ if (!dlv->settled) {
DEQ_INSERT_TAIL(link->unsettled, dlv);
+ dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+ } else
+ dlv->where = QDR_DELIVERY_NOWHERE;
credit--;
link->total_deliveries++;
offer = DEQ_SIZE(link->undelivered);
@@ -203,20 +206,6 @@ void qdr_delivery_free(qdr_delivery_t *delivery)
}
-void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery)
-{
-}
-
-
-void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *delivery)
-{
- //
- // Remove a delivery from its unsettled list. Side effects include issuing
- // replacement credit and visiting the link-quiescence algorithm
- //
-}
-
-
void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition, bool settled)
{
qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery");
@@ -257,6 +246,48 @@ qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
// In-Thread Functions
//==================================================================================
+void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery)
+{
+}
+
+
+void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+ //
+ // Remove a delivery from its unsettled list. Side effects include issuing
+ // replacement credit and visiting the link-quiescence algorithm
+ //
+ qdr_link_t *link = dlv->link;
+ qdr_connection_t *conn = link ? link->conn : 0;
+ bool issue_credit = false;
+
+ if (!link || !conn)
+ return;
+
+ //
+ // The lock needs to be acquired only for outgoing links
+ //
+ if (link->link_direction == QD_OUTGOING)
+ sys_mutex_lock(conn->work_lock);
+
+ if (dlv->where == QDR_DELIVERY_IN_UNSETTLED) {
+ DEQ_REMOVE(link->unsettled, dlv);
+ dlv->where = QDR_DELIVERY_NOWHERE;
+ issue_credit = true;
+ }
+
+ if (link->link_direction == QD_OUTGOING)
+ sys_mutex_unlock(conn->work_lock);
+
+ //
+ // If this is an incoming link and it is not link-routed, issue
+ // one replacement credit on the link.
+ //
+ if (issue_credit && link->link_direction == QD_INCOMING && !link->connected_link)
+ qdr_link_issue_credit_CT(core, link, 1);
+}
+
+
static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (discard)
@@ -316,12 +347,14 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
// a valid destination).
//
DEQ_INSERT_TAIL(link->undelivered, dlv);
+ dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
} else {
//
- // TODO - Release the delivery
+ // Release the delivery
//
+ qdr_delivery_release_CT(core, dlv);
}
- } else if (fanout == 1) {
+ } else if (fanout > 0) {
qd_bitmask_free(dlv->link_exclusion);
dlv->link_exclusion = 0;
if (dlv->settled) {
@@ -338,19 +371,9 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
assert(!dlv->peer);
qdr_delivery_free(dlv);
}
- } else
+ } else {
DEQ_INSERT_TAIL(link->unsettled, dlv);
- } else {
- //
- // The fanout is greater than one. Do something! TODO
- //
- qd_bitmask_free(dlv->link_exclusion);
- dlv->link_exclusion = 0;
-
- if (presettled) {
- qdr_link_issue_credit_CT(core, link, 1);
- assert(!dlv->peer);
- qdr_delivery_free(dlv);
+ dlv->where = QDR_DELIVERY_IN_UNSETTLED;
}
}
@@ -388,8 +411,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
if (!addr && dlv->to_addr)
qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
qdr_link_forward_CT(core, link, dlv, addr);
- } else
+ } else {
DEQ_INSERT_TAIL(link->undelivered, dlv);
+ dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
+ }
}
@@ -427,8 +452,6 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
uint64_t disp = action->args.delivery.disposition;
bool settled = action->args.delivery.settled;
- bool link_routed = dlv && dlv->link && dlv->link->connected_link;
-
//
// Logic:
//
@@ -455,22 +478,12 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
push = true;
peer->peer = 0;
dlv->peer = 0;
- if (peer->link) {
- sys_mutex_lock(peer->link->conn->work_lock);
- DEQ_REMOVE(peer->link->unsettled, peer);
- sys_mutex_unlock(peer->link->conn->work_lock);
- if (peer->link->link_direction == QD_INCOMING && !link_routed)
- qdr_link_issue_credit_CT(core, peer->link, 1);
- }
+ if (peer->link)
+ qdr_delivery_remove_unsettled_CT(core, peer);
}
- if (dlv->link) {
- sys_mutex_lock(dlv->link->conn->work_lock);
- DEQ_REMOVE(dlv->link->unsettled, dlv);
- sys_mutex_unlock(dlv->link->conn->work_lock);
- if (dlv->link->link_direction == QD_INCOMING && !link_routed)
- qdr_link_issue_credit_CT(core, dlv->link, 1);
- }
+ if (dlv->link)
+ qdr_delivery_remove_unsettled_CT(core, dlv);
qdr_delivery_free(dlv);
}
@@ -567,7 +580,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
{
- if (!dlv || !dlv->link)
+ if (!dlv || !dlv->link || dlv->where == QDR_DELIVERY_IN_UNDELIVERED)
return;
qdr_link_t *link = dlv->link;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 607545e..cfc3646 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -53,10 +53,9 @@ class RouterTest(TestCase):
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}),
- ('fixedAddress', {'prefix': '/closest/', 'fanout': 'single', 'bias': 'closest'}),
- ('fixedAddress', {'prefix': '/spread/', 'fanout': 'single', 'bias': 'spread'}),
- ('fixedAddress', {'prefix': '/multicast/', 'fanout': 'multiple'}),
- ('fixedAddress', {'prefix': '/', 'fanout': 'multiple'})
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
@@ -89,7 +88,7 @@ class RouterTest(TestCase):
M2.stop()
def test_02a_multicast_unsettled(self):
- addr = self.address+"/pre_settled/multicast/1"
+ addr = self.address+"/multicast.unsettled.1"
M1 = self.messenger()
M2 = self.messenger()
M3 = self.messenger()
@@ -280,14 +279,20 @@ class RouterTest(TestCase):
M1.outgoing_window = 5
M1.start()
+ M1.timeout = 1
tm = Message()
tm.address = addr
tm.body = {'number': 200}
- tx_tracker = M1.put(tm)
- M1.send(0)
- M1.flush()
- self.assertEqual(MODIFIED, M1.status(tx_tracker))
+ exception = False
+ try:
+ M1.put(tm)
+ M1.send(0)
+ M1.flush()
+ except Exception:
+ exception = True
+
+ self.assertEqual(exception, True)
M1.stop()
@@ -833,7 +838,7 @@ class RouterTest(TestCase):
def test_10_semantics_multicast(self):
- addr = self.address+"/multicast/1"
+ addr = self.address+"/multicast.10"
M1 = self.messenger()
M2 = self.messenger()
M3 = self.messenger()
@@ -877,7 +882,7 @@ class RouterTest(TestCase):
M4.stop()
def test_11_semantics_closest(self):
- addr = self.address+"/closest/1"
+ addr = self.address+"/closest.1"
M1 = self.messenger()
M2 = self.messenger()
M3 = self.messenger()
@@ -928,7 +933,7 @@ class RouterTest(TestCase):
M4.stop()
def test_12_semantics_spread(self):
- addr = self.address+"/spread/1"
+ addr = self.address+"/spread.1"
M1 = self.messenger()
M2 = self.messenger()
M3 = self.messenger()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org