You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/04/02 20:45:28 UTC

[1/3] qpid-dispatch git commit: DISPATCH-177 - Fixed leak of the link name.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 9b2d09fa0 -> ba920ba14


DISPATCH-177 - Fixed leak of the link name.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3f2905fe
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3f2905fe
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3f2905fe

Branch: refs/heads/master
Commit: 3f2905fe376f70c4923bf03093b7a81ca44aa8b5
Parents: 9b2d09f
Author: Ted Ross <tr...@redhat.com>
Authored: Sat Apr 2 11:53:04 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Sat Apr 2 11:53:04 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c | 6 ++++++
 src/router_core/forwarder.c   | 4 +++-
 2 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3f2905fe/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 60ae71a..5758ba9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -485,6 +485,12 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     qdr_del_link_ref(&conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
     qdr_del_link_ref(&conn->links_with_credit,     link, QDR_LINK_LIST_CLASS_FLOW);
     sys_mutex_unlock(conn->work_lock);
+
+    //
+    // Free the link's name
+    //
+    free(link->name);
+    link->name = 0;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3f2905fe/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 7c4641a..eddb079 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -473,10 +473,12 @@ bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
         out_link->conn           = conn;
         out_link->link_type      = QD_LINK_ENDPOINT;
         out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING;
-        out_link->name           = in_link->name;
         out_link->admin_enabled  = true;
         out_link->oper_status    = QDR_LINK_OPER_DOWN;
 
+        out_link->name = (char*) malloc(strlen(in_link->name + 1));
+        strcpy(out_link->name, in_link->name);
+
         out_link->connected_link = in_link;
         in_link->connected_link  = out_link;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-dispatch git commit: DISPATCH-253 - Configurable link capacity via listener and connector.

Posted by tr...@apache.org.
DISPATCH-253 - Configurable link capacity via listener and connector.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ceb3b389
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ceb3b389
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ceb3b389

Branch: refs/heads/master
Commit: ceb3b389aecdc326b67a72506cccf4a5e9d6b048
Parents: 3f2905f
Author: Ted Ross <tr...@redhat.com>
Authored: Sat Apr 2 12:59:33 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Sat Apr 2 12:59:33 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h           |  4 +++-
 include/qpid/dispatch/server.h                |  7 ++++++-
 python/qpid_dispatch/management/qdrouter.json | 14 +++++++++++++-
 src/connection_manager.c                      | 11 +++++++++++
 src/router_core/connections.c                 | 12 +++++++-----
 src/router_core/router_core_private.h         |  1 +
 src/router_node.c                             | 10 +++++++---
 7 files changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9c77a08..b7fa2cf 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -153,6 +153,7 @@ typedef enum {
  *        correlate the connection with waypoints and link-route destinations that use the connection.
  * @param strip_annotations_in True if configured to remove annotations on inbound messages.
  * @param strip_annotations_out True if configured to remove annotations on outbound messages.
+ * @param link_capacity The capacity, in deliveries, for links in this connection.
  * @return Pointer to a connection object that can be used to refer to this connection over its lifetime.
  */
 qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
@@ -161,7 +162,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
                                         const char            *label,
                                         const char            *remote_container_id,
                                         bool                   strip_annotations_in,
-                                        bool                   strip_annotations_out);
+                                        bool                   strip_annotations_out,
+                                        int                    link_capacity);
 
 /**
  * qdr_connection_closed

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 71ac479..5a85e63 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -319,10 +319,15 @@ typedef struct qd_server_config_t {
     /**
      * If true, strip the outbound qpid dispatch specific message annotations. This only applies to ingress and egress routers.
      * Annotations generated by inter-router messages will be untouched.
-    */
+     */
     bool strip_outbound_annotations;
 
     /**
+     * The number of deliveries that can be in-flight concurrently for each link within the connection.
+     */
+    int link_capacity;
+
+    /**
      * Path to the file containing the PEM-formatted public certificate for the local end
      * of the connection.
      */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 64cf6a4..48a735e 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -656,7 +656,13 @@
                     "default": "both",
                     "description": "['in', 'out', 'both', 'no'] in: Strip the dispatch router specific annotations only on ingress; out: Strip the dispatch router specific annotations only on egress; both: Strip the dispatch router specific annotations on both ingress and egress; no - do not strip dispatch router specific annotations",
                     "create": true
-                } 
+                },
+                "linkCapacity": {
+                    "type": "integer",
+                    "create": true,
+                    "required": false,
+                    "description": "The capacity of links within this connection, in terms of message deliveries.  The capacity is the number of messages that can be in-flight concurrently for each link."
+                }
             }
         },
 
@@ -700,6 +706,12 @@
                     "description": "['in', 'out', 'both', 'no'] in: Strip the dispatch router specific annotations only on ingress; out: Strip the dispatch router specific annotations only on egress; both: Strip the dispatch router specific annotations on both ingress and egress; no - do not strip dispatch router specific annotations",
                     "create": true
                 },
+                "linkCapacity": {
+                    "type": "integer",
+                    "create": true,
+                    "required": false,
+                    "description": "The capacity of links within this connection, in terms of message deliveries.  The capacity is the number of messages that can be in-flight concurrently for each link."
+                },
                 "saslUsername": {
                     "type": "string",
                     "required": false,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 67495ac..19d4b2e 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -150,6 +150,17 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     config->sasl_password        = qd_entity_opt_string(entity, "saslPassword", 0); CHECK();
     config->sasl_mechanisms      = qd_entity_opt_string(entity, "saslMechanisms", 0); CHECK();
     config->ssl_enabled          = has_attrs(entity, ssl_attributes, ssl_attributes_count);
+    config->link_capacity        = qd_entity_opt_long(entity, "linkCapacity", 0); CHECK();
+
+    //
+    // Handle the defaults for link capacity.
+    //
+    if (config->link_capacity == 0) {
+        if (strcmp("inter-router", config->role) == 0)
+            config->link_capacity = 100000; // This is effectively infinite since session flow control will be more stringent.
+        else
+            config->link_capacity = 10;
+    }
 
     //
     // For now we are hardwiring this attribute to true.  If there's an outcry from the

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 5758ba9..2521750 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -61,7 +61,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
                                         const char            *label,
                                         const char            *remote_container_id,
                                         bool                   strip_annotations_in,
-                                        bool                   strip_annotations_out)
+                                        bool                   strip_annotations_out,
+                                        int                    link_capacity)
 {
     qdr_action_t     *action = qdr_action(qdr_connection_opened_CT, "connection_opened");
     qdr_connection_t *conn   = new_qdr_connection_t();
@@ -73,6 +74,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
     conn->role                  = role;
     conn->strip_annotations_in  = strip_annotations_in;
     conn->strip_annotations_out = strip_annotations_out;
+    conn->link_capacity         = link_capacity;
     conn->mask_bit              = -1;
     DEQ_INIT(conn->links);
     DEQ_INIT(conn->work_list);
@@ -270,9 +272,9 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->name = (char*) malloc(strlen(name) + 1);
     strcpy(link->name, name);
     link->link_direction = dir;
-    link->capacity = 32;  // TODO - make this configurable
-    link->admin_enabled = true;
-    link->oper_status   = QDR_LINK_OPER_DOWN;
+    link->capacity       = conn->link_capacity;
+    link->admin_enabled  = true;
+    link->oper_status    = QDR_LINK_OPER_DOWN;
 
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
@@ -513,7 +515,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     link->conn           = conn;
     link->link_type      = link_type;
     link->link_direction = dir;
-    link->capacity       = 32; // TODO - make this configurable
+    link->capacity       = conn->link_capacity;
     link->name           = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
     qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
     link->admin_enabled  = true;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index fed9f88..801eb9a 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -413,6 +413,7 @@ struct qdr_connection_t {
     qdr_conn_identifier_t      *conn_id;
     bool                        strip_annotations_in;
     bool                        strip_annotations_out;
+    int                         link_capacity;
     int                         mask_bit;
     qdr_connection_work_list_t  work_list;
     sys_mutex_t                *work_lock;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ceb3b389/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 23b2602..530ca9f 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -45,7 +45,8 @@ static void qd_router_connection_get_config(const qd_connection_t  *conn,
                                             qdr_connection_role_t  *role,
                                             const char            **name,
                                             bool                   *strip_annotations_in,
-                                            bool                   *strip_annotations_out)
+                                            bool                   *strip_annotations_out,
+                                            int                    *link_capacity)
 {
     if (conn) {
         const qd_server_config_t *cf = qd_connection_config(conn);
@@ -53,6 +54,8 @@ static void qd_router_connection_get_config(const qd_connection_t  *conn,
         *strip_annotations_in  = cf ? cf->strip_inbound_annotations  : false;
         *strip_annotations_out = cf ? cf->strip_outbound_annotations : false;
 
+        *link_capacity = cf->link_capacity;
+
         if        (cf && strcmp(cf->role, router_role) == 0) {
             *strip_annotations_in  = false;
             *strip_annotations_out = false;
@@ -496,15 +499,16 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
     qdr_connection_role_t  role = 0;
     bool                   strip_annotations_in = false;
     bool                   strip_annotations_out = false;
+    int                    link_capacity = 1;
     const char            *name = 0;
     pn_connection_t       *pn_conn = qd_connection_pn(conn);
 
     qd_router_connection_get_config(conn, &role, &name,
-                                    &strip_annotations_in, &strip_annotations_out);
+                                    &strip_annotations_in, &strip_annotations_out, &link_capacity);
 
     qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, inbound, role, name,
                                                    pn_connection_remote_container(pn_conn),
-                                                   strip_annotations_in, strip_annotations_out);
+                                                   strip_annotations_in, strip_annotations_out, link_capacity);
 
     qd_connection_set_context(conn, qdrc);
     qdr_connection_set_context(qdrc, conn);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-dispatch git commit: DISPATCH-179 - Release deliveries on lost links.

Posted by tr...@apache.org.
DISPATCH-179 - Release deliveries on lost links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ba920ba1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ba920ba1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ba920ba1

Branch: refs/heads/master
Commit: ba920ba1488bf259ffc6f2ba5552c6710252cf05
Parents: ceb3b38
Author: Ted Ross <tr...@redhat.com>
Authored: Sat Apr 2 14:39:36 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Sat Apr 2 14:39:36 2016 -0400

----------------------------------------------------------------------
 src/router_core/transfer.c        | 10 +++++-
 tests/system_tests_one_router.py  | 55 +++++++++++++++++++++++++++++++
 tests/system_tests_two_routers.py | 60 ++++++++++++++++++++++++++++++++++
 3 files changed, 124 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ba920ba1/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 883801b..f05e96f 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -248,8 +248,16 @@ qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
 // In-Thread Functions
 //==================================================================================
 
-void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery)
+void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 {
+    bool push = dlv->disposition != PN_RELEASED;
+
+    dlv->disposition = PN_RELEASED;
+    dlv->settled = true;
+    bool moved = qdr_delivery_settled_CT(core, dlv);
+
+    if (push || moved)
+        qdr_delivery_push_CT(core, dlv);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ba920ba1/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 028ba1a..94b32dd 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -1053,6 +1053,15 @@ class RouterTest(TestCase):
         self.assertTrue(send_settle_mode_test.message_received)
         self.assertTrue(send_settle_mode_test.delivery_already_settled)
 
+    def test_14_excess_deliveries_released(self):
+        """
+        Message-route a series of deliveries where the receiver provides credit for a subset and
+        once received, closes the link.  The remaining deliveries should be released back to the sender.
+        """
+        test = ExcessDeliveriesReleasedTest(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 HELLO_WORLD = "Hello World!"
 
@@ -1089,5 +1098,51 @@ class SndSettleModeTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
+class ExcessDeliveriesReleasedTest(MessagingHandler):
+    def __init__(self, address):
+        super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
+        self.address = address
+        self.dest = "closest.EDRtest"
+        self.error = None
+        self.sender = None
+        self.receiver = None
+        self.n_sent     = 0
+        self.n_received = 0
+        self.n_accepted = 0
+        self.n_released = 0
+
+    def on_start(self, event):
+        conn = event.container.connect(self.address)
+        self.sender   = event.container.create_sender(conn, self.dest)
+        self.receiver = event.container.create_receiver(conn, self.dest)
+        self.receiver.flow(6)
+
+    def on_sendable(self, event):
+        for i in range(10 - self.n_sent):
+            msg = Message(body=i)
+            event.sender.send(msg)
+            self.n_sent += 1
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+
+    def on_released(self, event):
+        self.n_released += 1
+        if self.n_released == 4:
+            if self.n_accepted != 6:
+                self.error = "Expected 6 accepted, got %d" % self.n_accepted
+            if self.n_received != 6:
+                self.error = "Expected 6 received, got %d" % self.n_received
+            event.connection.close()
+
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == 6:
+            self.receiver.close()
+
+    def run(self):
+        Container(self).run()
+
 if __name__ == '__main__':
     unittest.main(main_module())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ba920ba1/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 8962a7c..2425b7f 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -20,6 +20,8 @@
 import unittest, os
 from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
 from system_test import TestCase, Qdrouterd, main_module, DIR
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, AtMostOnce, AtLeastOnce
 
 # PROTON-828:
 try:
@@ -994,6 +996,64 @@ class RouterTest(TestCase):
         M1.stop()
         M2.stop()
 
+    def test_14_excess_deliveries_released(self):
+        """
+        Message-route a series of deliveries where the receiver provides credit for a subset and
+        once received, closes the link.  The remaining deliveries should be released back to the sender.
+        """
+        test = ExcessDeliveriesReleasedTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class ExcessDeliveriesReleasedTest(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "closest.EDRtest"
+        self.error = None
+        self.sender = None
+        self.receiver = None
+        self.n_sent     = 0
+        self.n_received = 0
+        self.n_accepted = 0
+        self.n_released = 0
+
+    def on_start(self, event):
+        self.conn1 = event.container.connect(self.address1)
+        self.conn2 = event.container.connect(self.address2)
+        self.sender   = event.container.create_sender(self.conn1, self.dest)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
+        self.receiver.flow(6)
+
+    def on_sendable(self, event):
+        for i in range(10 - self.n_sent):
+            msg = Message(body=i)
+            event.sender.send(msg)
+            self.n_sent += 1
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+
+    def on_released(self, event):
+        self.n_released += 1
+        if self.n_released == 4:
+            if self.n_accepted != 6:
+                self.error = "Expected 6 accepted, got %d" % self.n_accepted
+            if self.n_received != 6:
+                self.error = "Expected 6 received, got %d" % self.n_received
+            self.conn1.close()
+            self.conn2.close()
+
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == 6:
+            self.receiver.close()
+
+    def run(self):
+        Container(self).run()
+
 
 
 try:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org