You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/19 00:06:40 UTC
[22/50] [abbrv] qpid-dispatch git commit: DISPATCH-232 - Add
capability to delete listeners and connectors via the qdmanage DELETE
operation
DISPATCH-232 - Add capability to delete listeners and connectors via the qdmanage DELETE operation
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/68aab381
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/68aab381
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/68aab381
Branch: refs/heads/master
Commit: 68aab3815919a549fae4d44014fbce813ac57704
Parents: 722b0eb
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Mar 7 22:52:58 2016 -0500
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Mar 10 17:02:43 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/connection_manager.h | 11 ++
python/qpid_dispatch/management/qdrouter.json | 4 +-
python/qpid_dispatch_internal/dispatch.py | 10 +-
.../qpid_dispatch_internal/management/agent.py | 32 +++--
src/connection_manager.c | 71 +++++++++-
tests/system_tests_qdmanage.py | 129 ++++++++++++++++---
6 files changed, 218 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/include/qpid/dispatch/connection_manager.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/connection_manager.h b/include/qpid/dispatch/connection_manager.h
index 1c43a8b..686cf3b 100644
--- a/include/qpid/dispatch/connection_manager.h
+++ b/include/qpid/dispatch/connection_manager.h
@@ -47,6 +47,17 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd);
*/
void qd_connection_manager_free(qd_connection_manager_t *cm);
+/**
+ * Free all the resources associated with a config listener
+ */
+void qd_config_listener_free(qd_config_listener_t *cl);
+
+
+/**
+ * Free all the resources associated with a config connector
+ */
+void qd_config_connector_free(qd_config_connector_t *cl);
+
/**
* Start the configured Listeners and Connectors
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 038b161..327e504 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -568,7 +568,7 @@
"listener": {
"description": "Listens for incoming connections to the router.",
"extends": "configurationEntity",
- "operations": ["CREATE"],
+ "operations": ["CREATE", "DELETE"],
"annotations": [
"addrPort",
"connectionRole",
@@ -643,7 +643,7 @@
"connector": {
"description": "Establishes an outgoing connections from the router.",
"extends": "configurationEntity",
- "operations": ["CREATE"],
+ "operations": ["CREATE", "DELETE"],
"annotations": [
"addrPort",
"connectionRole",
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/python/qpid_dispatch_internal/dispatch.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/dispatch.py b/python/qpid_dispatch_internal/dispatch.py
index a1b6e8b..49f9a83 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -59,12 +59,17 @@ class QdDll(ctypes.PyDLL):
self._prototype(self.qd_dispatch_configure_container, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_router, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_prepare, None, [self.qd_dispatch_p])
- self._prototype(self.qd_dispatch_configure_listener, None, [self.qd_dispatch_p, py_object])
- self._prototype(self.qd_dispatch_configure_connector, None, [self.qd_dispatch_p, py_object])
+ self._prototype(self.qd_dispatch_configure_listener, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
+ self._prototype(self.qd_dispatch_configure_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
+
+ self._prototype(self.qd_connection_manager_delete_listener, None, [self.qd_dispatch_p, ctypes.c_void_p])
+ self._prototype(self.qd_connection_manager_delete_connector, None, [self.qd_dispatch_p, ctypes.c_void_p])
+
self._prototype(self.qd_dispatch_configure_address, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_waypoint, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_lrp, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_route, None, [self.qd_dispatch_p, py_object])
+
self._prototype(self.qd_dispatch_set_agent, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_router_setup_late, None, [self.qd_dispatch_p])
@@ -73,7 +78,6 @@ class QdDll(ctypes.PyDLL):
self._prototype(self.qd_dispatch_router_unlock, None, [self.qd_dispatch_p])
self._prototype(self.qd_connection_manager_start, None, [self.qd_dispatch_p])
- #self._prototype(self.qd_waypoint_activate_all, None, [self.qd_dispatch_p])
self._prototype(self.qd_entity_refresh_begin, c_long, [py_object])
self._prototype(self.qd_entity_refresh_end, None, [])
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py
index c396366..f5443d4 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -211,8 +211,8 @@ class EntityAdapter(SchemaEntity):
def delete(self, request):
"""Handle delete request from client"""
- self._agent.remove(self)
self._delete()
+ self._agent.remove(self)
return (NO_CONTENT, {})
def _delete(self):
@@ -274,16 +274,24 @@ def _addr_port_identifier(entity):
class ListenerEntity(EntityAdapter):
def create(self):
- self._qd.qd_dispatch_configure_listener(self._dispatch, self)
+ config_listener = self._qd.qd_dispatch_configure_listener(self._dispatch, self)
self._qd.qd_connection_manager_start(self._dispatch)
+ return config_listener
- def _identifier(self): return _addr_port_identifier(self)
+ def _delete(self):
+ self._qd.qd_connection_manager_delete_listener(self._dispatch, self._implementations[0].key)
+ def _identifier(self): return _addr_port_identifier(self)
class ConnectorEntity(EntityAdapter):
def create(self):
- self._qd.qd_dispatch_configure_connector(self._dispatch, self)
+ config_connector = self._qd.qd_dispatch_configure_connector(self._dispatch, self)
self._qd.qd_connection_manager_start(self._dispatch)
+ return config_connector
+
+ def _delete(self):
+ """Can't actually delete a log source but return it to the default state"""
+ self._qd.qd_connection_manager_delete_connector(self._dispatch, self._implementations[0].key)
def _identifier(self): return _addr_port_identifier(self)
@@ -399,16 +407,18 @@ class EntityCache(object):
self.schema.validate_full(chain(iter([entity]), iter(self.entities)))
self.entities.append(entity)
- def _add_implementation(self, implementation):
+ def _add_implementation(self, implementation, adapter=None):
"""Create an adapter to wrap the implementation object and add it"""
cls = self.agent.entity_class(implementation.entity_type)
- adapter = cls(self.agent, implementation.entity_type, validate=False)
+ if not adapter:
+ adapter = cls(self.agent, implementation.entity_type, validate=False)
self.implementations[implementation.key] = adapter
adapter._add_implementation(implementation)
adapter._refresh()
self.add(adapter)
- def add_implementation(self, implementation): self._add_implementation(implementation)
+ def add_implementation(self, implementation, adapter=None):
+ self._add_implementation(implementation, adapter=adapter)
def _remove(self, entity):
try:
@@ -695,8 +705,12 @@ class Agent(object):
def _create(self, attributes):
"""Create an entity, called externally or from configuration file."""
entity = self.create_entity(attributes)
- self.add_entity(entity)
- entity.create()
+ pointer = entity.create()
+ if pointer:
+ cimplementation = CImplementation(self.qd, entity.entity_type, pointer)
+ self.entities.add_implementation(cimplementation, entity)
+ else:
+ self.add_entity(entity)
return entity
def create(self, request):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 794ddee..039202e 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -183,30 +183,53 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
return qd_error_code();
}
-void qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_config_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity)
{
+ qd_error_clear();
qd_connection_manager_t *cm = qd->connection_manager;
qd_config_listener_t *cl = NEW(qd_config_listener_t);
cl->is_connector = false;
cl->listener = 0;
- load_server_config(qd, &cl->configuration, entity);
+ if (load_server_config(qd, &cl->configuration, entity) != QD_ERROR_NONE) {
+ qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create config listener: %s", qd_error_message());
+ qd_config_listener_free(cl);
+ return 0;
+ }
DEQ_ITEM_INIT(cl);
DEQ_INSERT_TAIL(cm->config_listeners, cl);
qd_log(cm->log_source, QD_LOG_INFO, "Configured Listener: %s:%s role=%s",
cl->configuration.host, cl->configuration.port, cl->configuration.role);
+
+ return cl;
+}
+
+
+qd_error_t qd_entity_refresh_listener(qd_entity_t* entity, void *impl)
+{
+ return QD_ERROR_NONE;
}
-qd_error_t qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl)
+{
+ return QD_ERROR_NONE;
+}
+
+
+qd_config_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_error_clear();
qd_connection_manager_t *cm = qd->connection_manager;
qd_config_connector_t *cc = NEW(qd_config_connector_t);
memset(cc, 0, sizeof(*cc));
cc->is_connector = true;
- if (load_server_config(qd, &cc->configuration, entity))
- return qd_error_code();
+ if (load_server_config(qd, &cc->configuration, entity) != QD_ERROR_NONE) {
+ qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create config connector: %s", qd_error_message());
+ qd_config_connector_free(cc);
+ return 0;
+ }
DEQ_ITEM_INIT(cc);
+
if (strcmp(cc->configuration.role, "route-container") == 0) {
DEQ_INSERT_TAIL(cm->on_demand_connectors, cc);
qd_log(cm->log_source, QD_LOG_INFO, "Configured route-container connector: %s:%s name=%s",
@@ -217,7 +240,8 @@ qd_error_t qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entit
qd_log(cm->log_source, QD_LOG_INFO, "Configured Connector: %s:%s role=%s",
cc->configuration.host, cc->configuration.port, cc->configuration.role);
}
- return QD_ERROR_NONE;
+
+ return cc;
}
@@ -288,6 +312,41 @@ void qd_connection_manager_start(qd_dispatch_t *qd)
}
}
+void qd_config_connector_free(qd_config_connector_t *cc)
+{
+ if (cc->connector)
+ qd_server_connector_free(cc->connector);
+ free(cc);
+}
+
+void qd_config_listener_free(qd_config_listener_t *cl)
+{
+ if (cl->listener)
+ qd_server_listener_close(cl->listener);
+ qd_server_listener_free(cl->listener);
+ free(cl);
+}
+
+void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *impl)
+{
+ qd_config_listener_t *cl = (qd_config_listener_t*)impl;
+
+ if(cl) {
+ qd_server_listener_close(cl->listener);
+ DEQ_REMOVE(qd->connection_manager->config_listeners, cl);
+ qd_config_listener_free(cl);
+ }
+}
+
+void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl)
+{
+ qd_config_connector_t *cc = (qd_config_connector_t*)impl;
+
+ if(cc) {
+ DEQ_REMOVE(qd->connection_manager->config_connectors, cc);
+ qd_config_connector_free(cc);
+ }
+}
qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd, const char *name)
{
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/tests/system_tests_qdmanage.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py
index 2103915..f11bec4 100644
--- a/tests/system_tests_qdmanage.py
+++ b/tests/system_tests_qdmanage.py
@@ -18,7 +18,7 @@
#
import re, json, unittest, os
-from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR
+from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR, wait_port
from subprocess import PIPE, STDOUT
from qpid_dispatch_internal.compat import OrderedDict, dictify
from qpid_dispatch_internal.management.qdrouter import QdSchema
@@ -36,18 +36,30 @@ class QdmanageTest(TestCase):
@classmethod
def setUpClass(cls):
super(QdmanageTest, cls).setUpClass()
- config = Qdrouterd.Config([
+ cls.inter_router_port = cls.tester.get_port()
+ config_1 = Qdrouterd.Config([
+ ('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.1'}),
+ ('router', {'mode': 'interior', 'routerId': 'R1'}),
('ssl-profile', {'name': 'server-ssl',
'cert-db': cls.ssl_file('ca-certificate.pem'),
'cert-file': cls.ssl_file('server-certificate.pem'),
'key-file': cls.ssl_file('server-private-key.pem'),
'password': 'server-password'}),
('listener', {'port': cls.tester.get_port()}),
+ ('connector', {'role': 'inter-router', 'port': cls.inter_router_port}),
('listener', {'port': cls.tester.get_port(), 'ssl-profile': 'server-ssl'})
])
- cls.router = cls.tester.qdrouterd('test-router', config, wait=True)
- def address(self): return self.router.addresses[0]
+ config_2 = Qdrouterd.Config([
+ ('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.2'}),
+ ('router', {'mode': 'interior', 'routerId': 'R2'}),
+ ('listener', {'role': 'inter-router', 'port': cls.inter_router_port}),
+ ])
+ cls.router_2 = cls.tester.qdrouterd('test_router_2', config_2, wait=True)
+ cls.router_1 = cls.tester.qdrouterd('test_router_1', config_1, wait=True)
+
+ def address(self):
+ return self.router_1.addresses[0]
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
p = self.popen(
@@ -94,7 +106,6 @@ class QdmanageTest(TestCase):
self.run_qdmanage('delete --name mydummy')
self.run_qdmanage('read --name=mydummy', expect=Process.EXIT_FAIL)
-
def test_stdin(self):
"""Test piping from stdin"""
def check(cmd, expect, input, copy=None):
@@ -121,22 +132,27 @@ class QdmanageTest(TestCase):
check_list('update', expect_list, json.dumps(expect_list))
def test_query(self):
- def long_type(name): return u'org.apache.qpid.dispatch.'+name
+
+ def long_type(name):
+ return u'org.apache.qpid.dispatch.'+name
+
types = ['listener', 'log', 'container', 'router']
long_types = [long_type(name) for name in types]
qall = json.loads(self.run_qdmanage('query'))
qall_types = set([e['type'] for e in qall])
- for t in long_types: self.assertIn(t, qall_types)
+ for t in long_types:
+ self.assertIn(t, qall_types)
qlistener = json.loads(self.run_qdmanage('query --type=listener'))
self.assertEqual([long_type('listener')]*2, [e['type'] for e in qlistener])
- self.assertEqual(self.router.ports[0], int(qlistener[0]['port']))
+ self.assertEqual(self.router_1.ports[0], int(qlistener[0]['port']))
qattr = json.loads(
self.run_qdmanage('query type name'))
- for e in qattr: self.assertEqual(2, len(e))
+ for e in qattr:
+ self.assertEqual(2, len(e))
def name_type(entities):
ignore_types = [long_type(t) for t in ['router.link', 'connection', 'router.address']]
@@ -158,12 +174,36 @@ class QdmanageTest(TestCase):
def test_ssl(self):
"""Simple test for SSL connection. Note system_tests_qdstat has a more complete SSL test"""
- url = Url(self.router.addresses[1], scheme="amqps")
+ url = Url(self.router_1.addresses[1], scheme="amqps")
schema = dictify(QdSchema().dump())
actual = self.run_qdmanage("GET-JSON-SCHEMA")
self.assertEquals(schema, dictify(json.loads(actual)))
- def test_add_connector(self):
+ def create(self, type, name, port):
+ create_command = 'CREATE --type=' + type + ' --name=' + name + ' addr=0.0.0.0 port=' + port
+ connector = json.loads(self.run_qdmanage(create_command))
+ return connector
+
+ def test_create_delete_connector(self):
+ long_type = 'org.apache.qpid.dispatch.connector'
+ query_command = 'QUERY --type=' + long_type
+ output = json.loads(self.run_qdmanage(query_command))
+ name = output[0]['name']
+
+ # Delete an existing connector
+ delete_command = 'DELETE --type=' + long_type + ' --name=' + name
+ self.run_qdmanage(delete_command)
+ output = json.loads(self.run_qdmanage(query_command))
+ self.assertEqual(output, [])
+
+ # Re-create the connector and then try wait_connectors
+ self.create(long_type, name, str(QdmanageTest.inter_router_port))
+ full_name = 'connection/0.0.0.0:' + str(QdmanageTest.inter_router_port)
+ output = json.loads(self.run_qdmanage('READ --type=org.apache.qpid.dispatch.connection --name ' + full_name))
+
+ self.assertEquals(full_name, output['name'])
+
+ def test_zzz_add_connector(self):
port = self.get_port()
# dont provide role and make sure that role is defaulted to 'normal'
command = "CREATE --type=connector --name=eaconn1 port=" + str(port) + " addr=0.0.0.0"
@@ -188,17 +228,68 @@ class QdmanageTest(TestCase):
output = json.loads(self.run_qdmanage(command))
self.assertEqual("normal", output['role'])
- exception = False
- port = self.get_port()
- # provide mode as 'standalone' and role as 'inter-router'. This combination is not allowed
- command = "CREATE --type=connector --name=eaconn3 port=" + str(port) + " addr=0.0.0.0 role=inter-router"
+ def test_zzz_create_delete_listener(self):
+ long_type = 'org.apache.qpid.dispatch.listener'
+ name = 'ealistener'
+
+ listener_port = self.get_port()
+
+ listener = self.create(long_type, name, str(listener_port))
+ self.assertEquals(listener['type'], long_type)
+ self.assertEquals(listener['name'], name)
+
+ exception_occurred = False
+
try:
- output = json.loads(self.run_qdmanage(command))
+ # Try to connect to the port that was closed, it should not return an error
+ wait_port(listener_port, host='127.0.0.1', timeout=2)
except Exception as e:
- self.assertTrue("BadRequestStatus: role='inter-router' only allowed with router mode='interior'" in e.message)
- exception = True
+ exception_occurred = True
- self.assertTrue(exception)
+ self.assertFalse(exception_occurred)
+
+ delete_command = 'DELETE --type=' + long_type + ' --name=' + name
+ self.run_qdmanage(delete_command)
+
+ exception_occurred = False
+ try:
+ # Try deleting an already deleted connector, this should raise an exception
+ self.run_qdmanage(delete_command)
+ except Exception as e:
+ exception_occurred = True
+ self.assertTrue("NotFoundStatus: No entity with name='" + name + "'" in e.message)
+
+ self.assertTrue(exception_occurred)
+
+ try:
+ # Try to connect to that port, it should not return an error
+ wait_port(listener_port, host='127.0.0.1', timeout=2)
+ except Exception as e:
+ exception_occurred = True
+
+ self.assertTrue(exception_occurred)
+
+ # Now try the same thing with a short_type
+ short_type = 'listener'
+
+ listener_port = self.get_port()
+
+ listener = self.create(long_type, name, str(listener_port))
+ self.assertEquals(listener['type'], long_type)
+ self.assertEquals(listener['name'], name)
+
+ delete_command = 'DELETE --type=' + short_type + ' --name=' + name
+ self.run_qdmanage(delete_command)
+
+ exception_occurred = False
+
+ try:
+ # Try to connect to that port, it should not return an error
+ wait_port(listener_port, host='127.0.0.1', timeout=2)
+ except Exception as e:
+ exception_occurred = True
+
+ self.assertTrue(exception_occurred)
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org