You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2018/03/14 22:11:44 UTC

[1/3] qpid-dispatch git commit: DISPATCH-942: introduce new router config option to allow/disallow "resumable" links

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master dd8948e4d -> 0f05f836f


DISPATCH-942: introduce new router config option to allow/disallow "resumable" links


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e62b5fb2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e62b5fb2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e62b5fb2

Branch: refs/heads/master
Commit: e62b5fb211f20281a8aeac5e324b3f391610effa
Parents: dd8948e
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Mar 13 17:15:35 2018 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Tue Mar 13 20:06:43 2018 +0000

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h             |  12 ++
 python/qpid_dispatch/management/qdrouter.json   |   7 +
 src/dispatch.c                                  |   1 +
 src/dispatch_private.h                          |   1 +
 src/router_core/agent_router.c                  |   2 +
 src/router_core/agent_router.h                  |   2 +-
 src/router_core/connections.c                   |  30 ++++-
 src/router_core/router_core_private.h           |   1 +
 src/router_core/terminus.c                      |   5 +
 tests/CMakeLists.txt                            |   1 +
 ..._tests_disallow_link_resumable_link_route.py | 127 +++++++++++++++++++
 11 files changed, 181 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 5b964e9..8f144b0 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -329,6 +329,18 @@ bool qdr_terminus_is_coordinator(qdr_terminus_t *term);
 bool qdr_terminus_is_dynamic(qdr_terminus_t *term);
 
 /**
+ * qdr_terminus_survives_disconnect
+ *
+ * Indicate whether this terminus will survive disconnection (i.e. if
+ * state is expected to be kept).
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @return true iff the terminus has a timeout greater than 0 or an
+ * expiry-policy of never
+ */
+bool qdr_terminus_survives_disconnect(qdr_terminus_t *term);
+
+/**
  * qdr_terminus_set_address
  *
  * Set the terminus address

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index fbae3f0..e1a26ea 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -548,6 +548,13 @@
                     "required": false,
                     "default": false
                 },
+                "allowResumableLinkRoute": {
+                    "type": "boolean",
+                    "description": "Whether links can be routed where timeout is non-zero or expiry-policy is not link-detach",
+                    "create": true,
+                    "required": false,
+                    "default": true
+                },
                 "defaultDistribution": {
                     "type": ["multicast", "closest", "balanced", "unavailable"],
                     "description": "Default forwarding treatment for any address without a specified treatment. multicast - one copy of each message delivered to all subscribers; closest - messages delivered to only the closest subscriber; balanced - messages delivered to one subscriber with load balanced across subscribers; unavailable - this address is unavailable, link attaches to an address of unavilable distribution will be rejected.",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index e760c06..7564c0b 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -184,6 +184,7 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity)
     qd->router_mode = qd_entity_get_long(entity, "mode"); QD_ERROR_RET();
     qd->thread_count = qd_entity_opt_long(entity, "workerThreads", 4); QD_ERROR_RET();
     qd->allow_unsettled_multicast = qd_entity_opt_bool(entity, "allowUnsettledMulticast", false); QD_ERROR_RET();
+    qd->allow_resumable_link_route = qd_entity_opt_bool(entity, "allowResumableLinkRoute", true); QD_ERROR_RET();
 
     if (! qd->sasl_config_path) {
         qd->sasl_config_path = qd_entity_opt_string(entity, "saslConfigPath", 0); QD_ERROR_RET();

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index 7e54ade..094baba 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -58,6 +58,7 @@ struct qd_dispatch_t {
     char  *router_id;
     qd_router_mode_t  router_mode;
     bool   allow_unsettled_multicast;
+    bool   allow_resumable_link_route;
 };
 
 /**

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/agent_router.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c
index 6577bbf..d4aa99a 100644
--- a/src/router_core/agent_router.c
+++ b/src/router_core/agent_router.c
@@ -56,6 +56,7 @@
 #define QDR_ROUTER_DELIVERIES_TRANSIT                 30
 #define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER 31
 #define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER  32
+#define QDR_ROUTER_RESUMABLE_LINK_ROUTE               33
 
 
 const char *qdr_router_columns[] =
@@ -92,6 +93,7 @@ const char *qdr_router_columns[] =
      "deliveriesTransit",
      "deliveriesIngressRouteContainer",
      "deliveriesEgressRouteContainer",
+     "resumableLinkRouteDefault",
      0};
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/agent_router.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h
index 6cb9b18..3019672 100644
--- a/src/router_core/agent_router.h
+++ b/src/router_core/agent_router.h
@@ -21,7 +21,7 @@
 
 #include "router_core_private.h"
 
-#define QDR_ROUTER_COLUMN_COUNT  33
+#define QDR_ROUTER_COLUMN_COUNT  34
 
 const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1];
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 77efe7e..7add23a 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -922,6 +922,10 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t
                                                             "linkRoute to a coordinator must be configured to use transactions.");
             break;
 
+        case QDR_CONDITION_INVALID_LINK_EXPIRATION:
+            work->error = qdr_error("qd:link-expiration", "Requested link expiration not allowed");
+            break;
+
         case QDR_CONDITION_NONE:
             work->error = 0;
             break;
@@ -1340,7 +1344,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
     qdr_connection_free(conn);
 }
 
-
 static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     if (discard)
@@ -1419,11 +1422,18 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                     //
                     // This is a link-routed destination, forward the attach to the next hop
                     //
-                    success = qdr_forward_attach_CT(core, addr, link, source, target);
-                    if (!success) {
-                        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+                    if (qdr_terminus_survives_disconnect(target) && !core->qd->allow_resumable_link_route) {
+                        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
                         qdr_terminus_free(source);
                         qdr_terminus_free(target);
+                    } else {
+                        success = qdr_forward_attach_CT(core, addr, link, source, target);
+
+                        if (!success) {
+                            qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+                            qdr_terminus_free(source);
+                            qdr_terminus_free(target);
+                        }
                     }
 
                 }
@@ -1500,11 +1510,17 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 //
                 // This is a link-routed destination, forward the attach to the next hop
                 //
-                bool success = qdr_forward_attach_CT(core, addr, link, source, target);
-                if (!success) {
-                    qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+                if (qdr_terminus_survives_disconnect(source) && !core->qd->allow_resumable_link_route) {
+                    qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
                     qdr_terminus_free(source);
                     qdr_terminus_free(target);
+                } else {
+                    bool success = qdr_forward_attach_CT(core, addr, link, source, target);
+                    if (!success) {
+                        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+                        qdr_terminus_free(source);
+                        qdr_terminus_free(target);
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 389857b..c58ef5b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -49,6 +49,7 @@ typedef enum {
     QDR_CONDITION_FORBIDDEN,
     QDR_CONDITION_WRONG_ROLE,
     QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED,
+    QDR_CONDITION_INVALID_LINK_EXPIRATION,
     QDR_CONDITION_NONE
 } qdr_condition_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 713591a..4c0e0a3 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -19,6 +19,7 @@
 
 #include "router_core_private.h"
 #include <strings.h>
+#include <stdio.h>
 
 struct qdr_terminus_t {
     qdr_field_t            *address;
@@ -158,6 +159,10 @@ bool qdr_terminus_is_dynamic(qdr_terminus_t *term)
     return term->dynamic;
 }
 
+bool qdr_terminus_survives_disconnect(qdr_terminus_t *term)
+{
+    return term->timeout > 0 || term->expiry_policy == PN_EXPIRE_NEVER;
+}
 
 void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr)
 {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index ee55728..ff8f787 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -106,6 +106,7 @@ foreach(py_test_module
     system_tests_topology
     system_tests_topology_disposition
     system_tests_topology_addition
+    system_tests_disallow_link_resumable_link_route
     ${SYSTEM_TESTS_HTTP}
     )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/tests/system_tests_disallow_link_resumable_link_route.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_disallow_link_resumable_link_route.py b/tests/system_tests_disallow_link_resumable_link_route.py
new file mode 100644
index 0000000..bac12ff
--- /dev/null
+++ b/tests/system_tests_disallow_link_resumable_link_route.py
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest2 as unittest
+from time import sleep, time
+from subprocess import PIPE, STDOUT
+
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
+
+from proton import Message, Terminus
+from proton.reactor import DurableSubscription, SenderOption
+from proton.utils import BlockingConnection, LinkDetached
+
+class SenderExpiry(SenderOption):
+    def __init__(self, expiry):
+        self.expiry = expiry
+
+    def apply(self, sender):
+        sender.target.expiry_policy = self.expiry
+
+class SenderTimeout(SenderOption):
+    def __init__(self, timeout):
+        self.timeout = timeout
+
+    def apply(self, sender):
+        sender.target.timeout = self.timeout
+
+class LinkRouteTest(TestCase):
+    @classmethod
+    def get_router(cls, index):
+        return cls.routers[index]
+
+    @classmethod
+    def setUpClass(cls):
+        """Start three routers"""
+        super(LinkRouteTest, cls).setUpClass()
+
+        def router(name, config):
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+
+        cls.routers = []
+        a_listener_port = cls.tester.get_port()
+        b_listener_port = cls.tester.get_port()
+
+        router('A',
+               [
+                   ('router', {'mode': 'standalone', 'id': 'QDR.A'}),
+                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+               ])
+        router('B',
+               [
+                   #disallow resumable links
+                   ('router', {'mode': 'interior', 'id': 'QDR.B', 'allowResumableLinkRoute':False}),
+                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+                   #define link routes
+                   ('connector', {'name': 'broker', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+                   ('linkRoute', {'prefix': 'org.apache', 'containerId': 'QDR.A', 'dir': 'in'}),
+                   ('linkRoute', {'prefix': 'org.apache', 'containerId': 'QDR.A', 'dir': 'out'}),
+               ]
+               )
+        sleep(2)
+
+
+    def test_normal_receiver_allowed(self):
+        addr = self.routers[1].addresses[0]
+
+        connection = BlockingConnection(addr)
+        receiver = connection.create_receiver(address="org.apache")
+        connection.close()
+
+    def test_resumable_receiver_disallowed(self):
+        addr = self.routers[1].addresses[0]
+
+        connection = BlockingConnection(addr)
+        try:
+            receiver = connection.create_receiver(address="org.apache", options=[DurableSubscription()])
+            self.fail("link should have been detached")
+        except LinkDetached, e: None
+        connection.close()
+
+    def test_normal_sender_allowed(self):
+        addr = self.routers[1].addresses[0]
+
+        connection = BlockingConnection(addr)
+        sender = connection.create_sender(address="org.apache")
+        connection.close()
+
+    def test_expire_never_sender_disallowed(self):
+        addr = self.routers[1].addresses[0]
+
+        connection = BlockingConnection(addr)
+        try:
+            sender = connection.create_sender(address="org.apache", options=[SenderExpiry(Terminus.EXPIRE_NEVER)])
+            self.fail("link should have been detached")
+        except LinkDetached, e: None
+        connection.close()
+
+    def test_non_zero_timeout_sender_disallowed(self):
+        addr = self.routers[1].addresses[0]
+
+        connection = BlockingConnection(addr)
+        try:
+            sender = connection.create_sender(address="org.apache", options=[SenderTimeout(10)])
+            self.fail("link should have been detached")
+        except LinkDetached, e: None
+        connection.close()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-dispatch git commit: DISPATCH-944: Ensure socket is closed

Posted by gs...@apache.org.
DISPATCH-944: Ensure socket is closed


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0f05f836
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0f05f836
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0f05f836

Branch: refs/heads/master
Commit: 0f05f836f12390e898babcac234f176778670493
Parents: 9bcf083
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Mar 14 22:04:21 2018 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Mar 14 22:06:07 2018 +0000

----------------------------------------------------------------------
 src/remote_sasl.c | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0f05f836/src/remote_sasl.c
----------------------------------------------------------------------
diff --git a/src/remote_sasl.c b/src/remote_sasl.c
index 65dcab4..54fd7c8 100644
--- a/src/remote_sasl.c
+++ b/src/remote_sasl.c
@@ -376,6 +376,8 @@ static void remote_sasl_process_outcome(pn_transport_t *transport)
             //only consider complete if failed; if successful wait for the open frame
             if (impl->outcome != PN_SASL_OK && !notify_upstream(impl, DOWNSTREAM_OUTCOME_RECEIVED)) {
                 pnx_sasl_set_desired_state(transport, SASL_ERROR);
+                pn_transport_close_tail(transport);
+                pn_transport_close_head(transport);
             }
         }
     }
@@ -641,6 +643,8 @@ void qdr_handle_authentication_service_connection_event(pn_event_t *e)
 
         //close downstream connection
         pn_connection_close(conn);
+        pn_transport_close_tail(transport);
+        pn_transport_close_head(transport);
     } else if (pn_event_type(e) == PN_CONNECTION_REMOTE_CLOSE) {
         qd_log(auth_service_log, QD_LOG_DEBUG, "authentication service closed connection");
         pn_connection_close(conn);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-dispatch git commit: DISPATCH-943: do not wakeup the other connection on free as it may also be being freed

Posted by gs...@apache.org.
DISPATCH-943: do not wakeup the other connection on free as it may also be being freed


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9bcf0831
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9bcf0831
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9bcf0831

Branch: refs/heads/master
Commit: 9bcf0831386bfd0cef1591e7a672624c7d2c1849
Parents: e62b5fb
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Mar 14 21:42:20 2018 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Mar 14 22:05:44 2018 +0000

----------------------------------------------------------------------
 src/remote_sasl.c | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9bcf0831/src/remote_sasl.c
----------------------------------------------------------------------
diff --git a/src/remote_sasl.c b/src/remote_sasl.c
index cda58c6..65dcab4 100644
--- a/src/remote_sasl.c
+++ b/src/remote_sasl.c
@@ -243,15 +243,11 @@ static void remote_sasl_free(pn_transport_t *transport)
             impl->downstream_released = true;
             if (impl->upstream_released) {
                 delete_qdr_sasl_relay_t(impl);
-            } else {
-                pn_connection_wake(impl->upstream);
             }
         } else {
             impl->upstream_released = true;
             if (impl->downstream_released) {
                 delete_qdr_sasl_relay_t(impl);
-            } else {
-                pn_connection_wake(impl->downstream);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org