You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/19 00:06:40 UTC

[22/50] [abbrv] qpid-dispatch git commit: DISPATCH-232 - Add capability to delete listeners and connectors via the qdmanage DELETE operation

DISPATCH-232 - Add capability to delete listeners and connectors via the qdmanage DELETE operation


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/68aab381
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/68aab381
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/68aab381

Branch: refs/heads/master
Commit: 68aab3815919a549fae4d44014fbce813ac57704
Parents: 722b0eb
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Mar 7 22:52:58 2016 -0500
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Mar 10 17:02:43 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/connection_manager.h      |  11 ++
 python/qpid_dispatch/management/qdrouter.json   |   4 +-
 python/qpid_dispatch_internal/dispatch.py       |  10 +-
 .../qpid_dispatch_internal/management/agent.py  |  32 +++--
 src/connection_manager.c                        |  71 +++++++++-
 tests/system_tests_qdmanage.py                  | 129 ++++++++++++++++---
 6 files changed, 218 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/include/qpid/dispatch/connection_manager.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/connection_manager.h b/include/qpid/dispatch/connection_manager.h
index 1c43a8b..686cf3b 100644
--- a/include/qpid/dispatch/connection_manager.h
+++ b/include/qpid/dispatch/connection_manager.h
@@ -47,6 +47,17 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd);
  */
 void qd_connection_manager_free(qd_connection_manager_t *cm);
 
+/**
+ * Free all the resources associated with a config listener
+ */
+void qd_config_listener_free(qd_config_listener_t *cl);
+
+
+/**
+ * Free all the resources associated with a config connector
+ */
+void qd_config_connector_free(qd_config_connector_t *cl);
+
 
 /**
  * Start the configured Listeners and Connectors

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 038b161..327e504 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -568,7 +568,7 @@
         "listener": {
             "description": "Listens for incoming connections to the router.",
             "extends": "configurationEntity",
-            "operations": ["CREATE"],
+            "operations": ["CREATE", "DELETE"],
             "annotations": [
                 "addrPort",
                 "connectionRole",
@@ -643,7 +643,7 @@
         "connector": {
             "description": "Establishes an outgoing connections from the router.",
             "extends": "configurationEntity",
-            "operations": ["CREATE"],
+            "operations": ["CREATE", "DELETE"],
             "annotations": [
                 "addrPort",
                 "connectionRole",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/python/qpid_dispatch_internal/dispatch.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/dispatch.py b/python/qpid_dispatch_internal/dispatch.py
index a1b6e8b..49f9a83 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -59,12 +59,17 @@ class QdDll(ctypes.PyDLL):
         self._prototype(self.qd_dispatch_configure_container, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_router, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_prepare, None, [self.qd_dispatch_p])
-        self._prototype(self.qd_dispatch_configure_listener, None, [self.qd_dispatch_p, py_object])
-        self._prototype(self.qd_dispatch_configure_connector, None, [self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_configure_listener, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_configure_connector, ctypes.c_void_p, [self.qd_dispatch_p, py_object])
+
+        self._prototype(self.qd_connection_manager_delete_listener, None, [self.qd_dispatch_p, ctypes.c_void_p])
+        self._prototype(self.qd_connection_manager_delete_connector, None, [self.qd_dispatch_p, ctypes.c_void_p])
+
         self._prototype(self.qd_dispatch_configure_address, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_waypoint, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_lrp, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_route, None, [self.qd_dispatch_p, py_object])
+
         self._prototype(self.qd_dispatch_set_agent, None, [self.qd_dispatch_p, py_object])
 
         self._prototype(self.qd_router_setup_late, None, [self.qd_dispatch_p])
@@ -73,7 +78,6 @@ class QdDll(ctypes.PyDLL):
         self._prototype(self.qd_dispatch_router_unlock, None, [self.qd_dispatch_p])
 
         self._prototype(self.qd_connection_manager_start, None, [self.qd_dispatch_p])
-        #self._prototype(self.qd_waypoint_activate_all, None, [self.qd_dispatch_p])
         self._prototype(self.qd_entity_refresh_begin, c_long, [py_object])
         self._prototype(self.qd_entity_refresh_end, None, [])
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py
index c396366..f5443d4 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -211,8 +211,8 @@ class EntityAdapter(SchemaEntity):
 
     def delete(self, request):
         """Handle delete request from client"""
-        self._agent.remove(self)
         self._delete()
+        self._agent.remove(self)
         return (NO_CONTENT, {})
 
     def _delete(self):
@@ -274,16 +274,24 @@ def _addr_port_identifier(entity):
 
 class ListenerEntity(EntityAdapter):
     def create(self):
-        self._qd.qd_dispatch_configure_listener(self._dispatch, self)
+        config_listener = self._qd.qd_dispatch_configure_listener(self._dispatch, self)
         self._qd.qd_connection_manager_start(self._dispatch)
+        return config_listener
 
-    def _identifier(self): return _addr_port_identifier(self)
+    def _delete(self):
+        self._qd.qd_connection_manager_delete_listener(self._dispatch, self._implementations[0].key)
 
+    def _identifier(self): return _addr_port_identifier(self)
 
 class ConnectorEntity(EntityAdapter):
     def create(self):
-        self._qd.qd_dispatch_configure_connector(self._dispatch, self)
+        config_connector = self._qd.qd_dispatch_configure_connector(self._dispatch, self)
         self._qd.qd_connection_manager_start(self._dispatch)
+        return config_connector
+
+    def _delete(self):
+        """Can't actually delete a log source but return it to the default state"""
+        self._qd.qd_connection_manager_delete_connector(self._dispatch, self._implementations[0].key)
 
     def _identifier(self): return _addr_port_identifier(self)
 
@@ -399,16 +407,18 @@ class EntityCache(object):
         self.schema.validate_full(chain(iter([entity]), iter(self.entities)))
         self.entities.append(entity)
 
-    def _add_implementation(self, implementation):
+    def _add_implementation(self, implementation, adapter=None):
         """Create an adapter to wrap the implementation object and add it"""
         cls = self.agent.entity_class(implementation.entity_type)
-        adapter = cls(self.agent, implementation.entity_type, validate=False)
+        if not adapter:
+            adapter = cls(self.agent, implementation.entity_type, validate=False)
         self.implementations[implementation.key] = adapter
         adapter._add_implementation(implementation)
         adapter._refresh()
         self.add(adapter)
 
-    def add_implementation(self, implementation): self._add_implementation(implementation)
+    def add_implementation(self, implementation, adapter=None):
+        self._add_implementation(implementation, adapter=adapter)
 
     def _remove(self, entity):
         try:
@@ -695,8 +705,12 @@ class Agent(object):
     def _create(self, attributes):
         """Create an entity, called externally or from configuration file."""
         entity = self.create_entity(attributes)
-        self.add_entity(entity)
-        entity.create()
+        pointer = entity.create()
+        if pointer:
+            cimplementation = CImplementation(self.qd, entity.entity_type, pointer)
+            self.entities.add_implementation(cimplementation, entity)
+        else:
+            self.add_entity(entity)
         return entity
 
     def create(self, request):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 794ddee..039202e 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -183,30 +183,53 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     return qd_error_code();
 }
 
-void qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_config_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity)
 {
+    qd_error_clear();
     qd_connection_manager_t *cm = qd->connection_manager;
     qd_config_listener_t *cl = NEW(qd_config_listener_t);
     cl->is_connector = false;
     cl->listener = 0;
-    load_server_config(qd, &cl->configuration, entity);
+    if (load_server_config(qd, &cl->configuration, entity) != QD_ERROR_NONE) {
+        qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create config listener: %s", qd_error_message());
+        qd_config_listener_free(cl);
+        return 0;
+    }
     DEQ_ITEM_INIT(cl);
     DEQ_INSERT_TAIL(cm->config_listeners, cl);
     qd_log(cm->log_source, QD_LOG_INFO, "Configured Listener: %s:%s role=%s",
            cl->configuration.host, cl->configuration.port, cl->configuration.role);
+
+    return cl;
+}
+
+
+qd_error_t qd_entity_refresh_listener(qd_entity_t* entity, void *impl)
+{
+    return QD_ERROR_NONE;
 }
 
 
-qd_error_t qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl)
+{
+    return QD_ERROR_NONE;
+}
+
+
+qd_config_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
 {
     qd_error_clear();
     qd_connection_manager_t *cm = qd->connection_manager;
     qd_config_connector_t *cc = NEW(qd_config_connector_t);
     memset(cc, 0, sizeof(*cc));
     cc->is_connector = true;
-    if (load_server_config(qd, &cc->configuration, entity))
-        return qd_error_code();
+    if (load_server_config(qd, &cc->configuration, entity) != QD_ERROR_NONE) {
+        qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create config connector: %s", qd_error_message());
+        qd_config_connector_free(cc);
+        return 0;
+    }
     DEQ_ITEM_INIT(cc);
+
     if (strcmp(cc->configuration.role, "route-container") == 0) {
         DEQ_INSERT_TAIL(cm->on_demand_connectors, cc);
         qd_log(cm->log_source, QD_LOG_INFO, "Configured route-container connector: %s:%s name=%s",
@@ -217,7 +240,8 @@ qd_error_t qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entit
         qd_log(cm->log_source, QD_LOG_INFO, "Configured Connector: %s:%s role=%s",
                cc->configuration.host, cc->configuration.port, cc->configuration.role);
     }
-    return QD_ERROR_NONE;
+
+    return cc;
 }
 
 
@@ -288,6 +312,41 @@ void qd_connection_manager_start(qd_dispatch_t *qd)
     }
 }
 
+void qd_config_connector_free(qd_config_connector_t *cc)
+{
+    if (cc->connector)
+        qd_server_connector_free(cc->connector);
+    free(cc);
+}
+
+void qd_config_listener_free(qd_config_listener_t *cl)
+{
+    if (cl->listener)
+        qd_server_listener_close(cl->listener);
+        qd_server_listener_free(cl->listener);
+    free(cl);
+}
+
+void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *impl)
+{
+    qd_config_listener_t *cl = (qd_config_listener_t*)impl;
+
+    if(cl) {
+        qd_server_listener_close(cl->listener);
+        DEQ_REMOVE(qd->connection_manager->config_listeners, cl);
+        qd_config_listener_free(cl);
+    }
+}
+
+void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl)
+{
+    qd_config_connector_t *cc = (qd_config_connector_t*)impl;
+
+    if(cc) {
+        DEQ_REMOVE(qd->connection_manager->config_connectors, cc);
+        qd_config_connector_free(cc);
+    }
+}
 
 qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd, const char *name)
 {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/68aab381/tests/system_tests_qdmanage.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py
index 2103915..f11bec4 100644
--- a/tests/system_tests_qdmanage.py
+++ b/tests/system_tests_qdmanage.py
@@ -18,7 +18,7 @@
 #
 
 import re, json, unittest, os
-from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR
+from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR, wait_port
 from subprocess import PIPE, STDOUT
 from qpid_dispatch_internal.compat import OrderedDict, dictify
 from qpid_dispatch_internal.management.qdrouter import QdSchema
@@ -36,18 +36,30 @@ class QdmanageTest(TestCase):
     @classmethod
     def setUpClass(cls):
         super(QdmanageTest, cls).setUpClass()
-        config = Qdrouterd.Config([
+        cls.inter_router_port = cls.tester.get_port()
+        config_1 = Qdrouterd.Config([
+            ('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.1'}),
+            ('router', {'mode': 'interior', 'routerId': 'R1'}),
             ('ssl-profile', {'name': 'server-ssl',
                              'cert-db': cls.ssl_file('ca-certificate.pem'),
                              'cert-file': cls.ssl_file('server-certificate.pem'),
                              'key-file': cls.ssl_file('server-private-key.pem'),
                              'password': 'server-password'}),
             ('listener', {'port': cls.tester.get_port()}),
+            ('connector', {'role': 'inter-router', 'port': cls.inter_router_port}),
             ('listener', {'port': cls.tester.get_port(), 'ssl-profile': 'server-ssl'})
         ])
-        cls.router = cls.tester.qdrouterd('test-router', config, wait=True)
 
-    def address(self): return self.router.addresses[0]
+        config_2 = Qdrouterd.Config([
+            ('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.2'}),
+            ('router', {'mode': 'interior', 'routerId': 'R2'}),
+            ('listener', {'role': 'inter-router', 'port': cls.inter_router_port}),
+        ])
+        cls.router_2 = cls.tester.qdrouterd('test_router_2', config_2, wait=True)
+        cls.router_1 = cls.tester.qdrouterd('test_router_1', config_1, wait=True)
+
+    def address(self):
+        return self.router_1.addresses[0]
 
     def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
         p = self.popen(
@@ -94,7 +106,6 @@ class QdmanageTest(TestCase):
         self.run_qdmanage('delete --name mydummy')
         self.run_qdmanage('read --name=mydummy', expect=Process.EXIT_FAIL)
 
-
     def test_stdin(self):
         """Test piping from stdin"""
         def check(cmd, expect, input, copy=None):
@@ -121,22 +132,27 @@ class QdmanageTest(TestCase):
         check_list('update', expect_list, json.dumps(expect_list))
 
     def test_query(self):
-        def long_type(name): return u'org.apache.qpid.dispatch.'+name
+
+        def long_type(name):
+            return u'org.apache.qpid.dispatch.'+name
+
         types = ['listener', 'log', 'container', 'router']
         long_types = [long_type(name) for name in types]
 
         qall = json.loads(self.run_qdmanage('query'))
         qall_types = set([e['type'] for e in qall])
-        for t in long_types: self.assertIn(t, qall_types)
+        for t in long_types:
+            self.assertIn(t, qall_types)
 
         qlistener = json.loads(self.run_qdmanage('query --type=listener'))
         self.assertEqual([long_type('listener')]*2, [e['type'] for e in qlistener])
-        self.assertEqual(self.router.ports[0], int(qlistener[0]['port']))
+        self.assertEqual(self.router_1.ports[0], int(qlistener[0]['port']))
 
         qattr = json.loads(
             self.run_qdmanage('query type name'))
 
-        for e in qattr: self.assertEqual(2, len(e))
+        for e in qattr:
+            self.assertEqual(2, len(e))
 
         def name_type(entities):
             ignore_types = [long_type(t) for t in ['router.link', 'connection', 'router.address']]
@@ -158,12 +174,36 @@ class QdmanageTest(TestCase):
 
     def test_ssl(self):
         """Simple test for SSL connection. Note system_tests_qdstat has a more complete SSL test"""
-        url = Url(self.router.addresses[1], scheme="amqps")
+        url = Url(self.router_1.addresses[1], scheme="amqps")
         schema = dictify(QdSchema().dump())
         actual = self.run_qdmanage("GET-JSON-SCHEMA")
         self.assertEquals(schema, dictify(json.loads(actual)))
 
-    def test_add_connector(self):
+    def create(self, type, name, port):
+        create_command = 'CREATE --type=' + type + ' --name=' + name + ' addr=0.0.0.0 port=' + port
+        connector = json.loads(self.run_qdmanage(create_command))
+        return connector
+
+    def test_create_delete_connector(self):
+        long_type = 'org.apache.qpid.dispatch.connector'
+        query_command = 'QUERY --type=' + long_type
+        output = json.loads(self.run_qdmanage(query_command))
+        name = output[0]['name']
+
+        # Delete an existing connector
+        delete_command = 'DELETE --type=' + long_type + ' --name=' + name
+        self.run_qdmanage(delete_command)
+        output = json.loads(self.run_qdmanage(query_command))
+        self.assertEqual(output, [])
+
+        # Re-create the connector and then try wait_connectors
+        self.create(long_type, name, str(QdmanageTest.inter_router_port))
+        full_name = 'connection/0.0.0.0:' + str(QdmanageTest.inter_router_port)
+        output = json.loads(self.run_qdmanage('READ --type=org.apache.qpid.dispatch.connection --name ' + full_name))
+
+        self.assertEquals(full_name, output['name'])
+
+    def test_zzz_add_connector(self):
         port = self.get_port()
         # dont provide role and make sure that role is defaulted to 'normal'
         command = "CREATE --type=connector --name=eaconn1 port=" + str(port) + " addr=0.0.0.0"
@@ -188,17 +228,68 @@ class QdmanageTest(TestCase):
         output = json.loads(self.run_qdmanage(command))
         self.assertEqual("normal", output['role'])
 
-        exception = False
-        port = self.get_port()
-        # provide mode as 'standalone' and role as 'inter-router'. This combination is not allowed
-        command = "CREATE --type=connector --name=eaconn3 port=" + str(port) + " addr=0.0.0.0 role=inter-router"
+    def test_zzz_create_delete_listener(self):
+        long_type = 'org.apache.qpid.dispatch.listener'
+        name = 'ealistener'
+
+        listener_port = self.get_port()
+
+        listener = self.create(long_type, name, str(listener_port))
+        self.assertEquals(listener['type'], long_type)
+        self.assertEquals(listener['name'], name)
+
+        exception_occurred = False
+
         try:
-            output = json.loads(self.run_qdmanage(command))
+            # Try to connect to the port that was closed, it should not return an error
+            wait_port(listener_port, host='127.0.0.1', timeout=2)
         except Exception as e:
-            self.assertTrue("BadRequestStatus: role='inter-router' only allowed with router mode='interior'" in e.message)
-            exception = True
+            exception_occurred = True
 
-        self.assertTrue(exception)
+        self.assertFalse(exception_occurred)
+
+        delete_command = 'DELETE --type=' + long_type + ' --name=' + name
+        self.run_qdmanage(delete_command)
+
+        exception_occurred = False
+        try:
+            # Try deleting an already deleted connector, this should raise an exception
+            self.run_qdmanage(delete_command)
+        except Exception as e:
+            exception_occurred = True
+            self.assertTrue("NotFoundStatus: No entity with name='" + name + "'" in e.message)
+
+        self.assertTrue(exception_occurred)
+
+        try:
+            # Try to connect to that port, it should not return an error
+            wait_port(listener_port, host='127.0.0.1', timeout=2)
+        except Exception as e:
+            exception_occurred = True
+
+        self.assertTrue(exception_occurred)
+
+        # Now try the same thing with a short_type
+        short_type = 'listener'
+
+        listener_port = self.get_port()
+
+        listener = self.create(long_type, name, str(listener_port))
+        self.assertEquals(listener['type'], long_type)
+        self.assertEquals(listener['name'], name)
+
+        delete_command = 'DELETE --type=' + short_type + ' --name=' + name
+        self.run_qdmanage(delete_command)
+
+        exception_occurred = False
+
+        try:
+            # Try to connect to that port, it should not return an error
+            wait_port(listener_port, host='127.0.0.1', timeout=2)
+        except Exception as e:
+            exception_occurred = True
+
+        self.assertTrue(exception_occurred)
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org