You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2018/03/14 22:11:44 UTC
[1/3] qpid-dispatch git commit: DISPATCH-942: introduce new router
config option to allow/disallow "resumable" links
Repository: qpid-dispatch
Updated Branches:
refs/heads/master dd8948e4d -> 0f05f836f
DISPATCH-942: introduce new router config option to allow/disallow "resumable" links
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e62b5fb2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e62b5fb2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e62b5fb2
Branch: refs/heads/master
Commit: e62b5fb211f20281a8aeac5e324b3f391610effa
Parents: dd8948e
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Mar 13 17:15:35 2018 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Tue Mar 13 20:06:43 2018 +0000
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 12 ++
python/qpid_dispatch/management/qdrouter.json | 7 +
src/dispatch.c | 1 +
src/dispatch_private.h | 1 +
src/router_core/agent_router.c | 2 +
src/router_core/agent_router.h | 2 +-
src/router_core/connections.c | 30 ++++-
src/router_core/router_core_private.h | 1 +
src/router_core/terminus.c | 5 +
tests/CMakeLists.txt | 1 +
..._tests_disallow_link_resumable_link_route.py | 127 +++++++++++++++++++
11 files changed, 181 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 5b964e9..8f144b0 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -329,6 +329,18 @@ bool qdr_terminus_is_coordinator(qdr_terminus_t *term);
bool qdr_terminus_is_dynamic(qdr_terminus_t *term);
/**
+ * qdr_terminus_survives_disconnect
+ *
+ * Indicate whether this terminus will survive disconnection (i.e. if
+ * state is expected to be kept).
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @return true iff the terminus has a timeout greater than 0 or an
+ * expiry-policy of never
+ */
+bool qdr_terminus_survives_disconnect(qdr_terminus_t *term);
+
+/**
* qdr_terminus_set_address
*
* Set the terminus address
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index fbae3f0..e1a26ea 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -548,6 +548,13 @@
"required": false,
"default": false
},
+ "allowResumableLinkRoute": {
+ "type": "boolean",
+ "description": "Whether links can be routed where timeout is non-zero or expiry-policy is not link-detach",
+ "create": true,
+ "required": false,
+ "default": true
+ },
"defaultDistribution": {
"type": ["multicast", "closest", "balanced", "unavailable"],
"description": "Default forwarding treatment for any address without a specified treatment. multicast - one copy of each message delivered to all subscribers; closest - messages delivered to only the closest subscriber; balanced - messages delivered to one subscriber with load balanced across subscribers; unavailable - this address is unavailable, link attaches to an address of unavilable distribution will be rejected.",
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index e760c06..7564c0b 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -184,6 +184,7 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity)
qd->router_mode = qd_entity_get_long(entity, "mode"); QD_ERROR_RET();
qd->thread_count = qd_entity_opt_long(entity, "workerThreads", 4); QD_ERROR_RET();
qd->allow_unsettled_multicast = qd_entity_opt_bool(entity, "allowUnsettledMulticast", false); QD_ERROR_RET();
+ qd->allow_resumable_link_route = qd_entity_opt_bool(entity, "allowResumableLinkRoute", true); QD_ERROR_RET();
if (! qd->sasl_config_path) {
qd->sasl_config_path = qd_entity_opt_string(entity, "saslConfigPath", 0); QD_ERROR_RET();
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index 7e54ade..094baba 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -58,6 +58,7 @@ struct qd_dispatch_t {
char *router_id;
qd_router_mode_t router_mode;
bool allow_unsettled_multicast;
+ bool allow_resumable_link_route;
};
/**
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/agent_router.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c
index 6577bbf..d4aa99a 100644
--- a/src/router_core/agent_router.c
+++ b/src/router_core/agent_router.c
@@ -56,6 +56,7 @@
#define QDR_ROUTER_DELIVERIES_TRANSIT 30
#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER 31
#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER 32
+#define QDR_ROUTER_RESUMABLE_LINK_ROUTE 33
const char *qdr_router_columns[] =
@@ -92,6 +93,7 @@ const char *qdr_router_columns[] =
"deliveriesTransit",
"deliveriesIngressRouteContainer",
"deliveriesEgressRouteContainer",
+ "resumableLinkRouteDefault",
0};
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/agent_router.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h
index 6cb9b18..3019672 100644
--- a/src/router_core/agent_router.h
+++ b/src/router_core/agent_router.h
@@ -21,7 +21,7 @@
#include "router_core_private.h"
-#define QDR_ROUTER_COLUMN_COUNT 33
+#define QDR_ROUTER_COLUMN_COUNT 34
const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1];
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 77efe7e..7add23a 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -922,6 +922,10 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t
"linkRoute to a coordinator must be configured to use transactions.");
break;
+ case QDR_CONDITION_INVALID_LINK_EXPIRATION:
+ work->error = qdr_error("qd:link-expiration", "Requested link expiration not allowed");
+ break;
+
case QDR_CONDITION_NONE:
work->error = 0;
break;
@@ -1340,7 +1344,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
qdr_connection_free(conn);
}
-
static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (discard)
@@ -1419,11 +1422,18 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
// This is a link-routed destination, forward the attach to the next hop
//
- success = qdr_forward_attach_CT(core, addr, link, source, target);
- if (!success) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+ if (qdr_terminus_survives_disconnect(target) && !core->qd->allow_resumable_link_route) {
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
+ } else {
+ success = qdr_forward_attach_CT(core, addr, link, source, target);
+
+ if (!success) {
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ }
}
}
@@ -1500,11 +1510,17 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
// This is a link-routed destination, forward the attach to the next hop
//
- bool success = qdr_forward_attach_CT(core, addr, link, source, target);
- if (!success) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+ if (qdr_terminus_survives_disconnect(source) && !core->qd->allow_resumable_link_route) {
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
+ } else {
+ bool success = qdr_forward_attach_CT(core, addr, link, source, target);
+ if (!success) {
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 389857b..c58ef5b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -49,6 +49,7 @@ typedef enum {
QDR_CONDITION_FORBIDDEN,
QDR_CONDITION_WRONG_ROLE,
QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED,
+ QDR_CONDITION_INVALID_LINK_EXPIRATION,
QDR_CONDITION_NONE
} qdr_condition_t;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 713591a..4c0e0a3 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -19,6 +19,7 @@
#include "router_core_private.h"
#include <strings.h>
+#include <stdio.h>
struct qdr_terminus_t {
qdr_field_t *address;
@@ -158,6 +159,10 @@ bool qdr_terminus_is_dynamic(qdr_terminus_t *term)
return term->dynamic;
}
+bool qdr_terminus_survives_disconnect(qdr_terminus_t *term)
+{
+ return term->timeout > 0 || term->expiry_policy == PN_EXPIRE_NEVER;
+}
void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr)
{
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index ee55728..ff8f787 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -106,6 +106,7 @@ foreach(py_test_module
system_tests_topology
system_tests_topology_disposition
system_tests_topology_addition
+ system_tests_disallow_link_resumable_link_route
${SYSTEM_TESTS_HTTP}
)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e62b5fb2/tests/system_tests_disallow_link_resumable_link_route.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_disallow_link_resumable_link_route.py b/tests/system_tests_disallow_link_resumable_link_route.py
new file mode 100644
index 0000000..bac12ff
--- /dev/null
+++ b/tests/system_tests_disallow_link_resumable_link_route.py
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest2 as unittest
+from time import sleep, time
+from subprocess import PIPE, STDOUT
+
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
+
+from proton import Message, Terminus
+from proton.reactor import DurableSubscription, SenderOption
+from proton.utils import BlockingConnection, LinkDetached
+
+class SenderExpiry(SenderOption):
+ def __init__(self, expiry):
+ self.expiry = expiry
+
+ def apply(self, sender):
+ sender.target.expiry_policy = self.expiry
+
+class SenderTimeout(SenderOption):
+ def __init__(self, timeout):
+ self.timeout = timeout
+
+ def apply(self, sender):
+ sender.target.timeout = self.timeout
+
+class LinkRouteTest(TestCase):
+ @classmethod
+ def get_router(cls, index):
+ return cls.routers[index]
+
+ @classmethod
+ def setUpClass(cls):
+ """Start three routers"""
+ super(LinkRouteTest, cls).setUpClass()
+
+ def router(name, config):
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+
+ cls.routers = []
+ a_listener_port = cls.tester.get_port()
+ b_listener_port = cls.tester.get_port()
+
+ router('A',
+ [
+ ('router', {'mode': 'standalone', 'id': 'QDR.A'}),
+ ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+ ])
+ router('B',
+ [
+ #disallow resumable links
+ ('router', {'mode': 'interior', 'id': 'QDR.B', 'allowResumableLinkRoute':False}),
+ ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+ #define link routes
+ ('connector', {'name': 'broker', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+ ('linkRoute', {'prefix': 'org.apache', 'containerId': 'QDR.A', 'dir': 'in'}),
+ ('linkRoute', {'prefix': 'org.apache', 'containerId': 'QDR.A', 'dir': 'out'}),
+ ]
+ )
+ sleep(2)
+
+
+ def test_normal_receiver_allowed(self):
+ addr = self.routers[1].addresses[0]
+
+ connection = BlockingConnection(addr)
+ receiver = connection.create_receiver(address="org.apache")
+ connection.close()
+
+ def test_resumable_receiver_disallowed(self):
+ addr = self.routers[1].addresses[0]
+
+ connection = BlockingConnection(addr)
+ try:
+ receiver = connection.create_receiver(address="org.apache", options=[DurableSubscription()])
+ self.fail("link should have been detached")
+ except LinkDetached, e: None
+ connection.close()
+
+ def test_normal_sender_allowed(self):
+ addr = self.routers[1].addresses[0]
+
+ connection = BlockingConnection(addr)
+ sender = connection.create_sender(address="org.apache")
+ connection.close()
+
+ def test_expire_never_sender_disallowed(self):
+ addr = self.routers[1].addresses[0]
+
+ connection = BlockingConnection(addr)
+ try:
+ sender = connection.create_sender(address="org.apache", options=[SenderExpiry(Terminus.EXPIRE_NEVER)])
+ self.fail("link should have been detached")
+ except LinkDetached, e: None
+ connection.close()
+
+ def test_non_zero_timeout_sender_disallowed(self):
+ addr = self.routers[1].addresses[0]
+
+ connection = BlockingConnection(addr)
+ try:
+ sender = connection.create_sender(address="org.apache", options=[SenderTimeout(10)])
+ self.fail("link should have been detached")
+ except LinkDetached, e: None
+ connection.close()
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-dispatch git commit: DISPATCH-944: Ensure socket is closed
Posted by gs...@apache.org.
DISPATCH-944: Ensure socket is closed
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0f05f836
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0f05f836
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0f05f836
Branch: refs/heads/master
Commit: 0f05f836f12390e898babcac234f176778670493
Parents: 9bcf083
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Mar 14 22:04:21 2018 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Mar 14 22:06:07 2018 +0000
----------------------------------------------------------------------
src/remote_sasl.c | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0f05f836/src/remote_sasl.c
----------------------------------------------------------------------
diff --git a/src/remote_sasl.c b/src/remote_sasl.c
index 65dcab4..54fd7c8 100644
--- a/src/remote_sasl.c
+++ b/src/remote_sasl.c
@@ -376,6 +376,8 @@ static void remote_sasl_process_outcome(pn_transport_t *transport)
//only consider complete if failed; if successful wait for the open frame
if (impl->outcome != PN_SASL_OK && !notify_upstream(impl, DOWNSTREAM_OUTCOME_RECEIVED)) {
pnx_sasl_set_desired_state(transport, SASL_ERROR);
+ pn_transport_close_tail(transport);
+ pn_transport_close_head(transport);
}
}
}
@@ -641,6 +643,8 @@ void qdr_handle_authentication_service_connection_event(pn_event_t *e)
//close downstream connection
pn_connection_close(conn);
+ pn_transport_close_tail(transport);
+ pn_transport_close_head(transport);
} else if (pn_event_type(e) == PN_CONNECTION_REMOTE_CLOSE) {
qd_log(auth_service_log, QD_LOG_DEBUG, "authentication service closed connection");
pn_connection_close(conn);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-dispatch git commit: DISPATCH-943: do not wakeup the other
connection on free as it may also be being freed
Posted by gs...@apache.org.
DISPATCH-943: do not wakeup the other connection on free as it may also be being freed
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9bcf0831
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9bcf0831
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9bcf0831
Branch: refs/heads/master
Commit: 9bcf0831386bfd0cef1591e7a672624c7d2c1849
Parents: e62b5fb
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Mar 14 21:42:20 2018 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Mar 14 22:05:44 2018 +0000
----------------------------------------------------------------------
src/remote_sasl.c | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9bcf0831/src/remote_sasl.c
----------------------------------------------------------------------
diff --git a/src/remote_sasl.c b/src/remote_sasl.c
index cda58c6..65dcab4 100644
--- a/src/remote_sasl.c
+++ b/src/remote_sasl.c
@@ -243,15 +243,11 @@ static void remote_sasl_free(pn_transport_t *transport)
impl->downstream_released = true;
if (impl->upstream_released) {
delete_qdr_sasl_relay_t(impl);
- } else {
- pn_connection_wake(impl->upstream);
}
} else {
impl->upstream_released = true;
if (impl->downstream_released) {
delete_qdr_sasl_relay_t(impl);
- } else {
- pn_connection_wake(impl->downstream);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org