You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/04/02 20:45:28 UTC
[1/3] qpid-dispatch git commit: DISPATCH-177 - Fixed leak of the link
name.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 9b2d09fa0 -> ba920ba14
DISPATCH-177 - Fixed leak of the link name.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3f2905fe
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3f2905fe
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3f2905fe
Branch: refs/heads/master
Commit: 3f2905fe376f70c4923bf03093b7a81ca44aa8b5
Parents: 9b2d09f
Author: Ted Ross <tr...@redhat.com>
Authored: Sat Apr 2 11:53:04 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Sat Apr 2 11:53:04 2016 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 6 ++++++
src/router_core/forwarder.c | 4 +++-
2 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3f2905fe/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 60ae71a..5758ba9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -485,6 +485,12 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
qdr_del_link_ref(&conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
qdr_del_link_ref(&conn->links_with_credit, link, QDR_LINK_LIST_CLASS_FLOW);
sys_mutex_unlock(conn->work_lock);
+
+ //
+ // Free the link's name
+ //
+ free(link->name);
+ link->name = 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3f2905fe/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 7c4641a..eddb079 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -473,10 +473,12 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core,
out_link->conn = conn;
out_link->link_type = QD_LINK_ENDPOINT;
out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING;
- out_link->name = in_link->name;
out_link->admin_enabled = true;
out_link->oper_status = QDR_LINK_OPER_DOWN;
+ out_link->name = (char*) malloc(strlen(in_link->name + 1));
+ strcpy(out_link->name, in_link->name);
+
out_link->connected_link = in_link;
in_link->connected_link = out_link;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-dispatch git commit: DISPATCH-253 - Configurable link
capacity via listener and connector.
Posted by tr...@apache.org.
DISPATCH-253 - Configurable link capacity via listener and connector.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ceb3b389
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ceb3b389
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ceb3b389
Branch: refs/heads/master
Commit: ceb3b389aecdc326b67a72506cccf4a5e9d6b048
Parents: 3f2905f
Author: Ted Ross <tr...@redhat.com>
Authored: Sat Apr 2 12:59:33 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Sat Apr 2 12:59:33 2016 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 4 +++-
include/qpid/dispatch/server.h | 7 ++++++-
python/qpid_dispatch/management/qdrouter.json | 14 +++++++++++++-
src/connection_manager.c | 11 +++++++++++
src/router_core/connections.c | 12 +++++++-----
src/router_core/router_core_private.h | 1 +
src/router_node.c | 10 +++++++---
7 files changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9c77a08..b7fa2cf 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -153,6 +153,7 @@ typedef enum {
* correlate the connection with waypoints and link-route destinations that use the connection.
* @param strip_annotations_in True if configured to remove annotations on inbound messages.
* @param strip_annotations_out True if configured to remove annotations on outbound messages.
+ * @param link_capacity The capacity, in deliveries, for links in this connection.
* @return Pointer to a connection object that can be used to refer to this connection over its lifetime.
*/
qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
@@ -161,7 +162,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
const char *label,
const char *remote_container_id,
bool strip_annotations_in,
- bool strip_annotations_out);
+ bool strip_annotations_out,
+ int link_capacity);
/**
* qdr_connection_closed
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 71ac479..5a85e63 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -319,10 +319,15 @@ typedef struct qd_server_config_t {
/**
* If true, strip the outbound qpid dispatch specific message annotations. This only applies to ingress and egress routers.
* Annotations generated by inter-router messages will be untouched.
- */
+ */
bool strip_outbound_annotations;
/**
+ * The number of deliveries that can be in-flight concurrently for each link within the connection.
+ */
+ int link_capacity;
+
+ /**
* Path to the file containing the PEM-formatted public certificate for the local end
* of the connection.
*/
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 64cf6a4..48a735e 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -656,7 +656,13 @@
"default": "both",
"description": "['in', 'out', 'both', 'no'] in: Strip the dispatch router specific annotations only on ingress; out: Strip the dispatch router specific annotations only on egress; both: Strip the dispatch router specific annotations on both ingress and egress; no - do not strip dispatch router specific annotations",
"create": true
- }
+ },
+ "linkCapacity": {
+ "type": "integer",
+ "create": true,
+ "required": false,
+ "description": "The capacity of links within this connection, in terms of message deliveries. The capacity is the number of messages that can be in-flight concurrently for each link."
+ }
}
},
@@ -700,6 +706,12 @@
"description": "['in', 'out', 'both', 'no'] in: Strip the dispatch router specific annotations only on ingress; out: Strip the dispatch router specific annotations only on egress; both: Strip the dispatch router specific annotations on both ingress and egress; no - do not strip dispatch router specific annotations",
"create": true
},
+ "linkCapacity": {
+ "type": "integer",
+ "create": true,
+ "required": false,
+ "description": "The capacity of links within this connection, in terms of message deliveries. The capacity is the number of messages that can be in-flight concurrently for each link."
+ },
"saslUsername": {
"type": "string",
"required": false,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 67495ac..19d4b2e 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -150,6 +150,17 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
config->sasl_password = qd_entity_opt_string(entity, "saslPassword", 0); CHECK();
config->sasl_mechanisms = qd_entity_opt_string(entity, "saslMechanisms", 0); CHECK();
config->ssl_enabled = has_attrs(entity, ssl_attributes, ssl_attributes_count);
+ config->link_capacity = qd_entity_opt_long(entity, "linkCapacity", 0); CHECK();
+
+ //
+ // Handle the defaults for link capacity.
+ //
+ if (config->link_capacity == 0) {
+ if (strcmp("inter-router", config->role) == 0)
+ config->link_capacity = 100000; // This is effectively infinite since session flow control will be more stringent.
+ else
+ config->link_capacity = 10;
+ }
//
// For now we are hardwiring this attribute to true. If there's an outcry from the
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 5758ba9..2521750 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -61,7 +61,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
const char *label,
const char *remote_container_id,
bool strip_annotations_in,
- bool strip_annotations_out)
+ bool strip_annotations_out,
+ int link_capacity)
{
qdr_action_t *action = qdr_action(qdr_connection_opened_CT, "connection_opened");
qdr_connection_t *conn = new_qdr_connection_t();
@@ -73,6 +74,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
conn->role = role;
conn->strip_annotations_in = strip_annotations_in;
conn->strip_annotations_out = strip_annotations_out;
+ conn->link_capacity = link_capacity;
conn->mask_bit = -1;
DEQ_INIT(conn->links);
DEQ_INIT(conn->work_list);
@@ -270,9 +272,9 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
link->name = (char*) malloc(strlen(name) + 1);
strcpy(link->name, name);
link->link_direction = dir;
- link->capacity = 32; // TODO - make this configurable
- link->admin_enabled = true;
- link->oper_status = QDR_LINK_OPER_DOWN;
+ link->capacity = conn->link_capacity;
+ link->admin_enabled = true;
+ link->oper_status = QDR_LINK_OPER_DOWN;
link->strip_annotations_in = conn->strip_annotations_in;
link->strip_annotations_out = conn->strip_annotations_out;
@@ -513,7 +515,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->conn = conn;
link->link_type = link_type;
link->link_direction = dir;
- link->capacity = 32; // TODO - make this configurable
+ link->capacity = conn->link_capacity;
link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
link->admin_enabled = true;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index fed9f88..801eb9a 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -413,6 +413,7 @@ struct qdr_connection_t {
qdr_conn_identifier_t *conn_id;
bool strip_annotations_in;
bool strip_annotations_out;
+ int link_capacity;
int mask_bit;
qdr_connection_work_list_t work_list;
sys_mutex_t *work_lock;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 23b2602..530ca9f 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -45,7 +45,8 @@ static void qd_router_connection_get_config(const qd_connection_t *conn,
qdr_connection_role_t *role,
const char **name,
bool *strip_annotations_in,
- bool *strip_annotations_out)
+ bool *strip_annotations_out,
+ int *link_capacity)
{
if (conn) {
const qd_server_config_t *cf = qd_connection_config(conn);
@@ -53,6 +54,8 @@ static void qd_router_connection_get_config(const qd_connection_t *conn,
*strip_annotations_in = cf ? cf->strip_inbound_annotations : false;
*strip_annotations_out = cf ? cf->strip_outbound_annotations : false;
+ *link_capacity = cf->link_capacity;
+
if (cf && strcmp(cf->role, router_role) == 0) {
*strip_annotations_in = false;
*strip_annotations_out = false;
@@ -496,15 +499,16 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
qdr_connection_role_t role = 0;
bool strip_annotations_in = false;
bool strip_annotations_out = false;
+ int link_capacity = 1;
const char *name = 0;
pn_connection_t *pn_conn = qd_connection_pn(conn);
qd_router_connection_get_config(conn, &role, &name,
- &strip_annotations_in, &strip_annotations_out);
+ &strip_annotations_in, &strip_annotations_out, &link_capacity);
qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, inbound, role, name,
pn_connection_remote_container(pn_conn),
- strip_annotations_in, strip_annotations_out);
+ strip_annotations_in, strip_annotations_out, link_capacity);
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-dispatch git commit: DISPATCH-179 - Release deliveries on
lost links.
Posted by tr...@apache.org.
DISPATCH-179 - Release deliveries on lost links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ba920ba1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ba920ba1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ba920ba1
Branch: refs/heads/master
Commit: ba920ba1488bf259ffc6f2ba5552c6710252cf05
Parents: ceb3b38
Author: Ted Ross <tr...@redhat.com>
Authored: Sat Apr 2 14:39:36 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Sat Apr 2 14:39:36 2016 -0400
----------------------------------------------------------------------
src/router_core/transfer.c | 10 +++++-
tests/system_tests_one_router.py | 55 +++++++++++++++++++++++++++++++
tests/system_tests_two_routers.py | 60 ++++++++++++++++++++++++++++++++++
3 files changed, 124 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ba920ba1/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 883801b..f05e96f 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -248,8 +248,16 @@ qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
// In-Thread Functions
//==================================================================================
-void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery)
+void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
{
+ bool push = dlv->disposition != PN_RELEASED;
+
+ dlv->disposition = PN_RELEASED;
+ dlv->settled = true;
+ bool moved = qdr_delivery_settled_CT(core, dlv);
+
+ if (push || moved)
+ qdr_delivery_push_CT(core, dlv);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ba920ba1/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 028ba1a..94b32dd 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -1053,6 +1053,15 @@ class RouterTest(TestCase):
self.assertTrue(send_settle_mode_test.message_received)
self.assertTrue(send_settle_mode_test.delivery_already_settled)
+ def test_14_excess_deliveries_released(self):
+ """
+ Message-route a series of deliveries where the receiver provides credit for a subset and
+ once received, closes the link. The remaining deliveries should be released back to the sender.
+ """
+ test = ExcessDeliveriesReleasedTest(self.address)
+ test.run()
+ self.assertEqual(None, test.error)
+
HELLO_WORLD = "Hello World!"
@@ -1089,5 +1098,51 @@ class SndSettleModeTest(MessagingHandler):
def run(self):
Container(self).run()
+
+class ExcessDeliveriesReleasedTest(MessagingHandler):
+ def __init__(self, address):
+ super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
+ self.address = address
+ self.dest = "closest.EDRtest"
+ self.error = None
+ self.sender = None
+ self.receiver = None
+ self.n_sent = 0
+ self.n_received = 0
+ self.n_accepted = 0
+ self.n_released = 0
+
+ def on_start(self, event):
+ conn = event.container.connect(self.address)
+ self.sender = event.container.create_sender(conn, self.dest)
+ self.receiver = event.container.create_receiver(conn, self.dest)
+ self.receiver.flow(6)
+
+ def on_sendable(self, event):
+ for i in range(10 - self.n_sent):
+ msg = Message(body=i)
+ event.sender.send(msg)
+ self.n_sent += 1
+
+ def on_accepted(self, event):
+ self.n_accepted += 1
+
+ def on_released(self, event):
+ self.n_released += 1
+ if self.n_released == 4:
+ if self.n_accepted != 6:
+ self.error = "Expected 6 accepted, got %d" % self.n_accepted
+ if self.n_received != 6:
+ self.error = "Expected 6 received, got %d" % self.n_received
+ event.connection.close()
+
+ def on_message(self, event):
+ self.n_received += 1
+ if self.n_received == 6:
+ self.receiver.close()
+
+ def run(self):
+ Container(self).run()
+
if __name__ == '__main__':
unittest.main(main_module())
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ba920ba1/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 8962a7c..2425b7f 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -20,6 +20,8 @@
import unittest, os
from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
from system_test import TestCase, Qdrouterd, main_module, DIR
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, AtMostOnce, AtLeastOnce
# PROTON-828:
try:
@@ -994,6 +996,64 @@ class RouterTest(TestCase):
M1.stop()
M2.stop()
+ def test_14_excess_deliveries_released(self):
+ """
+ Message-route a series of deliveries where the receiver provides credit for a subset and
+ once received, closes the link. The remaining deliveries should be released back to the sender.
+ """
+ test = ExcessDeliveriesReleasedTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+class ExcessDeliveriesReleasedTest(MessagingHandler):
+ def __init__(self, address1, address2):
+ super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
+ self.address1 = address1
+ self.address2 = address2
+ self.dest = "closest.EDRtest"
+ self.error = None
+ self.sender = None
+ self.receiver = None
+ self.n_sent = 0
+ self.n_received = 0
+ self.n_accepted = 0
+ self.n_released = 0
+
+ def on_start(self, event):
+ self.conn1 = event.container.connect(self.address1)
+ self.conn2 = event.container.connect(self.address2)
+ self.sender = event.container.create_sender(self.conn1, self.dest)
+ self.receiver = event.container.create_receiver(self.conn2, self.dest)
+ self.receiver.flow(6)
+
+ def on_sendable(self, event):
+ for i in range(10 - self.n_sent):
+ msg = Message(body=i)
+ event.sender.send(msg)
+ self.n_sent += 1
+
+ def on_accepted(self, event):
+ self.n_accepted += 1
+
+ def on_released(self, event):
+ self.n_released += 1
+ if self.n_released == 4:
+ if self.n_accepted != 6:
+ self.error = "Expected 6 accepted, got %d" % self.n_accepted
+ if self.n_received != 6:
+ self.error = "Expected 6 received, got %d" % self.n_received
+ self.conn1.close()
+ self.conn2.close()
+
+ def on_message(self, event):
+ self.n_received += 1
+ if self.n_received == 6:
+ self.receiver.close()
+
+ def run(self):
+ Container(self).run()
+
try:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org