You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/10/18 01:35:37 UTC
svn commit: r1632696 [1/2] - in /qpid/dispatch/trunk: doc/api/ doc/man/
include/qpid/dispatch/ python/qpid_dispatch/
python/qpid_dispatch/management/ python/qpid_dispatch/qpid_dispatch/
python/qpid_dispatch_internal/ python/qpid_dispatch_internal/manag...
Author: aconway
Date: Fri Oct 17 23:35:35 2014
New Revision: 1632696
URL: http://svn.apache.org/r1632696
Log:
DISPATCH-56: Unifiy C and python management agents.
There are 2 management agents.
Previously the C agent was at the default address $management, python agent at $management2
This commit completes all missing features of the python agent and makes it the default on $management.
The old C agent is still available on $cmanagement, but it is deprecated and will be removed.
- Added C entity types to the python schema: router.link, router.address, router.node, connection, allocator
- Update attribute values from C in python agent.
- Implemented get-operations, get-attributes, get-mgmt-nodes
- Added optional file:line in logging.
Added:
qpid/dispatch/trunk/include/qpid/dispatch/enum.h
qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h
- copied, changed from r1632169, qpid/dispatch/trunk/src/static_assert.h
qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json
- copied, changed from r1632169, qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json
qpid/dispatch/trunk/src/aprintf.h
qpid/dispatch/trunk/src/c_entity.c
qpid/dispatch/trunk/src/c_entity.h
- copied, changed from r1632169, qpid/dispatch/trunk/src/static_assert.h
Removed:
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json
qpid/dispatch/trunk/src/static_assert.h
Modified:
qpid/dispatch/trunk/doc/api/doxygen-wrapper.py
qpid/dispatch/trunk/doc/man/CMakeLists.txt
qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py
qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h
qpid/dispatch/trunk/include/qpid/dispatch/error.h
qpid/dispatch/trunk/include/qpid/dispatch/log.h
qpid/dispatch/trunk/python/qpid_dispatch/management/client.py
qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py
qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py
qpid/dispatch/trunk/python/qpid_dispatch/site.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
qpid/dispatch/trunk/src/CMakeLists.txt
qpid/dispatch/trunk/src/agent.c
qpid/dispatch/trunk/src/alloc.c
qpid/dispatch/trunk/src/connection_manager.c
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/dispatch.c
qpid/dispatch/trunk/src/dispatch_private.h
qpid/dispatch/trunk/src/entity.c
qpid/dispatch/trunk/src/entity_private.h
qpid/dispatch/trunk/src/error.c
qpid/dispatch/trunk/src/log.c
qpid/dispatch/trunk/src/message.c
qpid/dispatch/trunk/src/python_embedded.c
qpid/dispatch/trunk/src/router_agent.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/router_private.h
qpid/dispatch/trunk/src/router_pynode.c
qpid/dispatch/trunk/src/server.c
qpid/dispatch/trunk/src/server_private.h
qpid/dispatch/trunk/src/waypoint.c
qpid/dispatch/trunk/tests/management/client.py
qpid/dispatch/trunk/tests/management/qdrouter.py
qpid/dispatch/trunk/tests/management/schema.py
qpid/dispatch/trunk/tests/run.py.in
qpid/dispatch/trunk/tests/system_test.py
qpid/dispatch/trunk/tests/system_tests_management.py
qpid/dispatch/trunk/tests/system_tests_one_router.py
qpid/dispatch/trunk/tests/system_tests_qdmanage.py
qpid/dispatch/trunk/tests/system_tests_two_routers.py
qpid/dispatch/trunk/tools/qdmanage
Modified: qpid/dispatch/trunk/doc/api/doxygen-wrapper.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/doc/api/doxygen-wrapper.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/doc/api/doxygen-wrapper.py (original)
+++ qpid/dispatch/trunk/doc/api/doxygen-wrapper.py Fri Oct 17 23:35:35 2014
@@ -18,7 +18,7 @@
# under the License.
#
-import sys, subprocess;
+import sys, subprocess
doxygen, config = sys.argv[1:3]
outfile = config+".output"
Modified: qpid/dispatch/trunk/doc/man/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/doc/man/CMakeLists.txt?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/doc/man/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/doc/man/CMakeLists.txt Fri Oct 17 23:35:35 2014
@@ -26,7 +26,7 @@ set (QDROUTERD_CONF qdrouterd.conf.5)
file (GLOB_RECURSE QDROUTERD_CONF_DEPENDS
${CMAKE_CURRENT_SOURCE_DIR}/qdrouterd_conf_man.py
${CMAKE_SOURCE_DIR}/python/qpid_router_internal/management/*.py
- ${CMAKE_SOURCE_DIR}/python/qpid_router_internal/management/qdrouterd.json)
+ ${CMAKE_SOURCE_DIR}/python/qpid_router/management/qdrouterd.json)
add_custom_command (OUTPUT ${QDROUTERD_CONF}
COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_BINARY_DIR}/tests/run.py -s ${CMAKE_CURRENT_SOURCE_DIR}/qdrouterd_conf_man.py ${QDROUTERD_CONF}
Modified: qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py (original)
+++ qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py Fri Oct 17 23:35:35 2014
@@ -18,7 +18,8 @@
##
"""
-Generate the qdrouterd.conf man page from the qdrouterd management schema."""
+Generate the qdrouterd.conf man page from the qdrouterd management schema.
+"""
import sys
from qpid_dispatch_internal.management.qdrouter import QdSchema
@@ -134,10 +135,11 @@ listener {
f.write('.IP "Included by %s."\n'%(', '.join(used_by)))
f.write(".SH ENTITY SECTIONS\n\n")
- for name, entity_type in schema.entity_types.iteritems():
- f.write('.SS "%s"\n'% name)
- write_attributes(entity_type)
- f.write('.IP "Includes %s."\n'%(', '.join(entity_type.include)))
+ for entity_type in schema.entity_types.itervalues():
+ if "CREATE" in entity_type.operations and not entity_type.short_name == 'dummy':
+ f.write('.SS "%s"\n'% entity_type.short_name)
+ write_attributes(entity_type)
+ f.write('.IP "Includes %s."\n'%(', '.join(entity_type.include)))
if __name__ == '__main__':
Modified: qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h Fri Oct 17 23:35:35 2014
@@ -22,6 +22,8 @@
#include <qpid/dispatch/error.h>
typedef struct qd_entity_t qd_entity_t;
+typedef struct qd_c_entity_t qd_c_entity_t;
+typedef struct qd_c_entity_type_t qd_c_entity_type_t;
/**
* @defgroup dispatch
@@ -46,7 +48,7 @@ qd_dispatch_t *qd_dispatch(const char *p
*
* @param dispatch The dispatch handle returned by qd_dispatch
*/
-void qd_dispatch_free(qd_dispatch_t *dispatch);
+void qd_dispatch_free(qd_dispatch_t *qd);
/**
* Load the configuration file.
@@ -54,7 +56,7 @@ void qd_dispatch_free(qd_dispatch_t *dis
* @param dispatch The dispatch handle returned by qd_dispatch
* @param config_path The path to the configuration file.
*/
-qd_error_t qd_dispatch_load_config(qd_dispatch_t *dispatch, const char *config_path);
+qd_error_t qd_dispatch_load_config(qd_dispatch_t *qd, const char *config_path);
/**
* Configure the AMQP container from a configuration entity.
@@ -62,7 +64,7 @@ qd_error_t qd_dispatch_load_config(qd_di
* @param dispatch The dispatch handle returned by qd_dispatch
* @param entity The configuration entity.
*/
-qd_error_t qd_dispatch_configure_container(qd_dispatch_t *dispatch, qd_entity_t *entity);
+qd_error_t qd_dispatch_configure_container(qd_dispatch_t *qd, qd_entity_t *entity);
/**
* Configure the router node from a configuration entity.
@@ -71,7 +73,7 @@ qd_error_t qd_dispatch_configure_contain
* @param dispatch The dispatch handle returned by qd_dispatch.
* @param entity The configuration entity.
*/
-qd_error_t qd_dispatch_configure_router(qd_dispatch_t *dispatch, qd_entity_t *entity);
+qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity);
/**
* Prepare Dispatch for operation. This must be called prior to
@@ -79,17 +81,17 @@ qd_error_t qd_dispatch_configure_router(
*
* @param dispatch The dispatch handle returned by qd_dispatch
*/
-qd_error_t qd_dispatch_prepare(qd_dispatch_t *dispatch);
+qd_error_t qd_dispatch_prepare(qd_dispatch_t *qd);
/**
* Configure an address, must be called after qd_dispatch_prepare
*/
-qd_error_t qd_dispatch_configure_address(qd_dispatch_t *dispatch, qd_entity_t *entity);
+qd_error_t qd_dispatch_configure_address(qd_dispatch_t *qd, qd_entity_t *entity);
/**
* Configure a waypoint, must be called after qd_dispatch_prepare
*/
-qd_error_t qd_dispatch_configure_waypoint(qd_dispatch_t *dispatch, qd_entity_t *entity);
+qd_error_t qd_dispatch_configure_waypoint(qd_dispatch_t *qd, qd_entity_t *entity);
/**
* \brief Configure the logging module from the
@@ -100,6 +102,15 @@ qd_error_t qd_dispatch_configure_waypoin
*/
qd_error_t qd_dispatch_configure_logging(qd_dispatch_t *qd);
+/** Register a managed entity implementation with the management agent.
+ * NOTE: impl must be unregistered before it is freed.
+ */
+void qd_dispatch_register_entity(qd_dispatch_t *qd, const char *type, void *impl);
+
+/** Unregister a managed entity implementation */
+void qd_dispatch_unregister_entity(qd_dispatch_t *qd, void *impl);
+
+
/**
* @}
*/
Added: qpid/dispatch/trunk/include/qpid/dispatch/enum.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/enum.h?rev=1632696&view=auto
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/enum.h (added)
+++ qpid/dispatch/trunk/include/qpid/dispatch/enum.h Fri Oct 17 23:35:35 2014
@@ -0,0 +1,57 @@
+#ifndef ENUM_H
+#define ENUM_H
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/static_assert.h>
+
+/**@file
+ *
+ * Tools for working with enums and enum string names.
+ *
+ * Enums should follow this pattern:
+ *
+ * // .h file
+ * typedef enum {
+ * FOO_VALUE1,
+ * FOO_VALUE2,
+ * } foo_t;
+ * ENUM_DECLARE(foo); // Declares const char *foo_name(foo_t value)
+ *
+ * // .c file
+ * static const char *const *foo_names = { "foo-value1", "foo-value2", ... };
+ * ENUM_DEFINE(foo, FOO_ENUM_COUNT, foo_names); // Defines const char *foo_name(foo_t value)
+ */
+
+/** Declares:
+ * const char *NAME_get_name(NAME_t value); // get name for value or NULL if value is invalid.
+ */
+#define ENUM_DECLARE(NAME) const char *NAME##_name(NAME##_t value)
+
+/** Defines:
+ * const char *NAME_name(NAME_t value)
+ */
+#define ENUM_DEFINE(NAME, NAME_ARRAY) \
+ const char *NAME##_name(NAME##_t value) { \
+ static const size_t count = sizeof(NAME_ARRAY)/sizeof(NAME_ARRAY[0]); \
+ return (0 <= value && value < count) ? NAME_ARRAY[value] : 0; \
+ } \
+ extern int NAME##__enum_dummy // So macro use can end with a ';'
+
+#endif
Modified: qpid/dispatch/trunk/include/qpid/dispatch/error.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/error.h?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/error.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/error.h Fri Oct 17 23:35:35 2014
@@ -19,6 +19,8 @@
* under the License.
*/
+#include <qpid/dispatch/enum.h>
+
/** @file
* Thread-safe error handling mechansim for dispatch.
*
@@ -44,9 +46,9 @@ typedef enum {
QD_ERROR_CONFIG, ///< Error in configuration
QD_ERROR_TYPE, ///< Value of inappropriate type.
QD_ERROR_VALUE, ///< Invalid value.
- QD_ERROR_RUNTIME, ///< Run-time failure.
- QD_ERROR_ENUM_COUNT ///< Not an error, marks the end of the enum.
+ QD_ERROR_RUNTIME ///< Run-time failure.
} qd_error_t;
+ENUM_DECLARE(qd_error);
/**
* Store thread-local error code and message.
@@ -54,7 +56,9 @@ typedef enum {
*@param fmt printf-stlye format.
*@return code
*/
-qd_error_t qd_error(qd_error_t code, const char *fmt, ...);
+#define qd_error(code, fmt, ...) qd_error_impl(code, __FILE__, __LINE__, fmt, ##__VA_ARGS__)
+
+qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char *fmt, ...);
/**
* Clear thread-local error code and message.
@@ -85,7 +89,9 @@ extern const int QD_ERROR_MAX;
*
* @return QD_ERROR_PYTHON or QD_ERROR_NONE.
*/
-qd_error_t qd_error_py();
+#define qd_error_py() qd_error_py_impl(__FILE__, __LINE__)
+
+qd_error_t qd_error_py_impl(const char *file, int line);
#define QD_ERROR_RET() do { if (qd_error_code()) return qd_error_code(); } while(0)
#define QD_ERROR_PY_RET() do { if (qd_error_py()) return qd_error_code(); } while(0)
Modified: qpid/dispatch/trunk/include/qpid/dispatch/log.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/log.h?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/log.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/log.h Fri Oct 17 23:35:35 2014
@@ -47,8 +47,7 @@ void qd_log_impl(qd_log_source_t *source
* @param c qd_log_level_t log level of message
* @param f printf style format string ...
*/
-#define qd_log(s, c, f, ...) \
- do { if (qd_log_enabled(s,c)) qd_log_impl(s, c, __FILE__, __LINE__, f , ##__VA_ARGS__); } while(0)
+#define qd_log(s, c, f, ...) qd_log_impl(s, c, __FILE__, __LINE__, f , ##__VA_ARGS__)
/** Maximum length for a log message */
int qd_log_max_len();
Copied: qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h (from r1632169, qpid/dispatch/trunk/src/static_assert.h)
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h?p2=qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h&p1=qpid/dispatch/trunk/src/static_assert.h&r1=1632169&r2=1632696&rev=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/static_assert.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h Fri Oct 17 23:35:35 2014
@@ -38,4 +38,7 @@
extern char STATIC_ASSERTION__##msg[(expr)?1:2]
#endif /* #ifdef __GNUC__ */
+#define STATIC_ASSERT_ARRAY_LEN(array, len) \
+ STATIC_ASSERT(sizeof(array)/sizeof(array[0]) == len, array##_wrong_size);
+
#endif // STATIC_ASSERT_H
Modified: qpid/dispatch/trunk/python/qpid_dispatch/management/client.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/client.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/management/client.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/client.py Fri Oct 17 23:35:35 2014
@@ -21,89 +21,11 @@
AMQP management client for Qpid dispatch.
"""
-import proton, re, threading
+import proton, threading
+from proton import Url
from .error import *
from .entity import Entity as BaseEntity, clean_dict
-class Url(object):
- """Simple AMQP URL parser/constructor"""
-
- URL_RE = re.compile(r"""
- # [ <scheme>:// ] [ <user> [ : <password> ] @] ( <host4> | \[ <host6> \] ) [ :<port> ] [ / path]
- ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: : ([^:/@]+) )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))? (?: / (.*))? $
-""", re.X | re.I)
- AMQPS = "amqps"
- AMQP = "amqp"
-
- def __init__(self, url=None, **kwargs):
- """
- @param url: String or Url instance to parse or copy.
- @param kwargs: URL fields: scheme, user, password, host, port, path.
- If specified, replaces corresponding component in url.
- """
-
- fields = ['scheme', 'user', 'password', 'host', 'port', 'path']
-
- for f in fields: setattr(self, f, None)
- for k in kwargs: getattr(self, k) # Check for invalid kwargs
-
- if isinstance(url, Url): # Copy from another Url instance.
- self.__dict__.update(url.__dict__)
-
- elif url is not None: # Parse from url
- match = Url.URL_RE.match(url)
- if match is None:
- raise ValueError("Invalid AMQP URL: %s"%url)
- self.scheme, self.user, self.password, host4, host6, port, self.path = match.groups()
- self.host = host4 or host6
- self.port = port and int(port)
-
- # Let kwargs override values previously set from url
- for field in fields:
- setattr(self, field, kwargs.get(field, getattr(self, field)))
-
- def __repr__(self):
- return "Url(%r)" % str(self)
-
- def __str__(self):
- s = ""
- if self.scheme:
- s += "%s://" % self.scheme
- if self.user:
- s += self.user
- if self.password:
- s += ":%s@" % self.password
- if self.host and ':' in self.host:
- s += "[%s]" % self.host
- else:
- s += self.host or '0.0.0.0'
- if self.port:
- s += ":%s" % self.port
- if self.path:
- s += "/%s" % self.path
- return s
-
- def __eq__(self, url):
- if isinstance(url, basestring):
- url = Url(url)
- return \
- self.scheme == url.scheme and \
- self.user == url.user and self.password == url.password and \
- self.host == url.host and self.port == url.port and \
- self.path == url.path
-
- def __ne__(self, url):
- return not self.__eq__(url)
-
- def defaults(self):
- """"Fill in defaults for scheme and port if missing """
- if not self.scheme: self.scheme = self.AMQP
- if not self.port:
- if self.scheme == self.AMQP: self.port = 5672
- elif self.scheme == self.AMQPS: self.port = 5671
- else: raise ValueError("Invalid URL scheme: %s"%self.scheme)
- return self
-
class MessengerImpl(object):
"""
Messaging implementation for L{Node} based on proton.Messenger
@@ -244,10 +166,6 @@ class Node(object):
response.properties.get('statusDescription')))
else:
raise ManagementError.create(code, response.properties.get('statusDescription'))
- if response.correlation_id != request.correlation_id:
- raise NotFoundStatus("Bad correlation id request=%s, response=%s" %
- (request.correlation_id, response.correlation_id))
-
def request(self, body=None, **properties):
"""
@@ -279,7 +197,10 @@ class Node(object):
if not request.reply_to:
raise ValueError("Message must have reply_to %s", request)
self.message_impl.send(request)
- response = self.message_impl.fetch()
+ while True:
+ response = self.message_impl.fetch()
+ # Ignore mismatched correlation IDs, responses to earlier requests that timed out.
+ if response.correlation_id == request.correlation_id: break
self.check_response(response, request, expect=expect)
return response
Modified: qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py Fri Oct 17 23:35:35 2014
@@ -77,3 +77,10 @@ class Entity(object):
self.__delitem__(name)
def __repr__(self): return "Entity(%r)" % self.attributes
+
+def update(entity, values):
+ """Update entity from values
+ @param entity: an Entity
+ @param values: a map of values
+ """
+ for k, v in values.iteritems(): entity[k] = v
Copied: qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json (from r1632169, qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json)
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json?p2=qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json&p1=qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json&r1=1632169&r2=1632696&rev=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json Fri Oct 17 23:35:35 2014
@@ -85,7 +85,7 @@
},
"entity_types": {
"container": {
- "allows": "CR",
+ "operations": ["CREATE", "READ"],
"singleton": true,
"include": [
"common"
@@ -104,7 +104,7 @@
},
"router": {
- "allows": "CR",
+ "operations": ["CREATE", "READ"],
"singleton": true,
"include": [
"common"
@@ -151,13 +151,16 @@
"type": "Integer",
"default": 60,
"description": ""
- }
+ },
+ "addrCount": {"type": "Integer"},
+ "linkCount": {"type": "Integer"},
+ "nodeCount": {"type": "Integer"}
}
},
"listener": {
"description": "Listens for incoming connections to the router",
- "allows": "CR",
+ "operations": ["CREATE", "READ"],
"include": [
"common",
"ssl-profile",
@@ -193,7 +196,7 @@
},
"connector": {
"description": "Establishes an outgoing connections from the router",
- "allows": "CR",
+ "operations": ["CREATE", "READ"],
"include": [
"common",
"ssl-profile",
@@ -219,19 +222,20 @@
"include": [
"common"
],
- "allows": "CR",
+ "operations": ["CREATE", "READ"],
"attributes": {
"module": {
"type":[
"ROUTER",
"MESSAGE",
"SERVER",
- "AGENT",
- "PYAGENT",
+ "CAGENT",
+ "AGENT",
"CONTAINER",
"CONFIG",
"DEFAULT",
- "ERROR"
+ "ERROR",
+ "DISPATCH"
],
"required": true,
"description": "Module to configure logging level. The special module 'DEFAULT' specifies logging for modules that don't have explicit log sections."
@@ -253,7 +257,12 @@
"timestamp": {
"type": "Boolean",
"default": true,
- "description": "Set a timestamp on log messages"
+ "description": "Include timestamp in log messages"
+ },
+ "source": {
+ "type": "Boolean",
+ "default": false,
+ "description": "Include source file and line number in log messages"
},
"output": {
"type": "String",
@@ -263,7 +272,7 @@
},
"fixed-address": {
- "allows": "CR",
+ "operations": ["CREATE", "READ"],
"include": [
"common"
],
@@ -297,7 +306,7 @@
},
"waypoint": {
- "allows": "CR",
+ "operations": ["CREATE", "READ"],
"include": [
"common"
],
@@ -323,7 +332,7 @@
"dummy": {
"description": "Dummy entity for test purposes.",
- "allows": "CRUD",
+ "operations": ["CREATE", "READ", "UPDATE", "DELETE", "CALLME"],
"include": ["common"],
"attributes": {
"arg1": {"type": "String"},
@@ -331,6 +340,84 @@
"num1": {"type": "Integer"},
"num2": {"type": "Integer"}
}
+ },
+
+ "router.link": {
+ "description": "Link to another AMQP endpoint: router node, client or other AMQP process.",
+ "operations": ["READ"],
+ "include": ["common"],
+ "attributes": {
+ "linkType": {"type": ["endpoint", "waypoint", "inter-router", "inter-area"]},
+ "linkDir": {"type": ["in", "out"]},
+ "owningAddr": {"type": "String"},
+ "eventFifoDepth": {"type": "Integer"},
+ "msgFifoDepth": {"type": "Integer"}
+ }
+ },
+
+ "router.address": {
+ "description": "AMQP address managed by the router",
+ "operations": ["READ"],
+ "include": ["common"],
+ "attributes": {
+ "inProcess": {"type": "Boolean"},
+ "subscriberCount": {"type": "Integer"},
+ "remoteCount": {"type": "Integer"},
+ "deliveriesIngress": {"type": "Integer"},
+ "deliveriesEgress": {"type": "Integer"},
+ "deliveriesTransit": {"type": "Integer"},
+ "deliveriesToContainer": {"type": "Integer"},
+ "deliveriesFromContainer": {"type": "Integer"}
+ }
+ },
+
+ "router.node": {
+ "description": "AMQP node managed by the router",
+ "operations": ["READ"],
+ "include": ["common"],
+ "attributes": {
+ "addr": {"type": "String"},
+ "nextHop": {"type": "Integer"},
+ "routerLink": {"type": "Integer"},
+ "validOrigins": {"type": "List"}
+ }
+ },
+
+ "connection": {
+ "description": "Connections to the router's container",
+ "operations": ["READ"],
+ "include": ["common"],
+ "attributes": {
+ "container": {"type": "String"} ,
+ "state": {"type": [
+ "connecting",
+ "opening",
+ "operational",
+ "failed",
+ "user"
+ ]},
+ "host": {"type": "String"},
+ "dir": {"type": ["in", "out"]},
+ "role": {"type": "String"},
+ "sasl": {"type": "String"}
+ }
+ },
+
+ "allocator": {
+ "description": "Memory allocation pool.",
+ "operations": ["READ"],
+ "include": ["common"],
+ "attributes": {
+ "type_size": {"type": "Integer"},
+ "transfer_batch_size": {"type": "Integer"},
+ "local_free_list_max": {"type": "Integer"},
+ "global_free_list_max": {"type": "Integer"},
+ "total_alloc_from_heap": {"type": "Integer"},
+ "total_free_to_heap": {"type": "Integer"},
+ "held_by_threads": {"type": "Integer"},
+ "batches_rebalanced_to_threads": {"type": "Integer"},
+ "batches_rebalanced_to_global": {"type": "Integer"}
+ }
}
}
}
Modified: qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py Fri Oct 17 23:35:35 2014
@@ -25,7 +25,5 @@ environment variables. This file will no
site.py will use env variables instead.
"""
-from os.path import join
-
QPID_DISPATCH_HOME = ""
QPID_DISPATCH_LIB = ""
Modified: qpid/dispatch/trunk/python/qpid_dispatch/site.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/site.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/site.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/site.py Fri Oct 17 23:35:35 2014
@@ -27,13 +27,10 @@ def get_variable(name):
"""Get variable value by first checking os.environ, then site_data"""
value = os.environ.get(name)
if value: return value
- try:
- site_data = __import__('qpid_dispatch.site_data', globals(), locals(), [name])
- return getattr(site_data, name)
- except ImportError, e:
- raise ImportError("%s: Set %s environment variable." % (e, env))
+ site_data = __import__('qpid_dispatch.site_data', globals(), locals(), [name])
+ return getattr(site_data, name)
-for var in ['QPID_DISPATCH_HOME', 'QPID_DISPATCH_LIB']:
- globals()[var] = get_variable(var)
+QPID_DISPATCH_HOME = get_variable('QPID_DISPATCH_HOME')
+QPID_DISPATCH_LIB = get_variable('QPID_DISPATCH_LIB')
sys.path.insert(0, os.path.join(QPID_DISPATCH_HOME, 'python'))
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py Fri Oct 17 23:35:35 2014
@@ -19,7 +19,8 @@
"""Access to functions in libqpid-dispatch.so"""
-import ctypes, os
+import ctypes
+from contextlib import contextmanager
from ctypes import c_char_p, c_long, py_object
from qpid_dispatch.site import QPID_DISPATCH_LIB
@@ -47,7 +48,6 @@ class QdDll(ctypes.PyDLL):
super(QdDll, self).__init__(lib)
# Types
- # TODO aconway 2014-06-27: can we use typed pointers instead of void*?
self.qd_dispatch_p = ctypes.c_void_p
# No check on qd_error_* functions, it would be recursive
@@ -63,9 +63,23 @@ class QdDll(ctypes.PyDLL):
self._prototype(self.qd_dispatch_configure_address, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_waypoint, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_set_agent, None, [self.qd_dispatch_p, py_object])
+
self._prototype(self.qd_router_setup_late, None, [self.qd_dispatch_p])
+
+ self._prototype(self.qd_dispatch_router_lock, None, [self.qd_dispatch_p])
+ self._prototype(self.qd_dispatch_router_unlock, None, [self.qd_dispatch_p])
+
self._prototype(self.qd_connection_manager_start, None, [self.qd_dispatch_p])
self._prototype(self.qd_waypoint_activate_all, None, [self.qd_dispatch_p])
+ self._prototype(self.qd_c_entity_flush, c_long, [py_object])
+
+ @contextmanager
+ def scoped_dispatch_router_lock(self, dispatch):
+ self.qd_dispatch_router_lock(dispatch)
+ try:
+ yield
+ finally:
+ self.qd_dispatch_router_unlock(dispatch)
def _errcheck(self, result, func, args):
if self.qd_error_code():
@@ -76,6 +90,10 @@ class QdDll(ctypes.PyDLL):
f.restype = restype
f.argtypes = argtypes
if check: f.errcheck = self._errcheck
+ return f
+
+ def function(self, fname, restype, argtypes, check=True):
+ return self._prototype(getattr(self, fname), restype, argtypes, check)
def instance():
return QdDll.instance()
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py Fri Oct 17 23:35:35 2014
@@ -23,15 +23,41 @@ Python agent for dispatch router.
Implements the server side of the AMQP management protocol for the dispatch router.
Manages a set of manageable Entities that can be Created, Read, Updated and Deleted.
Entity types are described by the schema in qdrouter.json.
+
+HOW IT WORKS:
+
+There are 3 types of entity:
+1 .Entities created via this agent, e.g. configuration entities.
+ Attributes are pushed into C code and cached by the agent.
+2. Entities created in C code and registered with the agent.
+ Attributes are pulled from C code before handling an operation.
+3. Combined: created as 1. and later registered via 2.
+ Attributes pulled from C overide the initial attributes.
+
+THREAD SAFETY:
+
+The agent is always driven by requests arriving in connection threads.
+Handling requests is serialized.
+
+Adding/removing/updating entities from C:
+- C code registers add/remove of C entities in a thread safe C cache.
+- Before handling an operation, the agent:
+ 1. locks the router
+ 2. flushes the add/remove cache and adds/removes entities in python cache.
+ 3. updates *all* known entites with a C implementation.
+ 4. unlocks the router.
"""
+import re
+from itertools import ifilter, chain
from traceback import format_exc
from threading import Lock
+from ctypes import c_void_p, py_object, c_long
from dispatch import IoAdapter, LogAdapter, LOG_DEBUG, LOG_ERROR
from qpid_dispatch.management.error import ManagementError, OK, CREATED, NO_CONTENT, STATUS_TEXT, \
BadRequestStatus, InternalServerErrorStatus, NotImplementedStatus, NotFoundStatus
from .. import dispatch_c
-from .schema import ValidationError, Entity as SchemaEntity
+from .schema import ValidationError, Entity as SchemaEntity, EntityType
from .qdrouter import QdSchema
from ..router.message import Message
@@ -65,19 +91,30 @@ class Entity(SchemaEntity):
Base class for agent entities with operations as well as attributes.
"""
- def __init__(self, agent, entity_type, attributes):
+ def _update(self): return False # Replaced by _set_pointer
+
+ def __init__(self, agent, entity_type, attributes=None, validate=True):
"""
- @para qd: Dispatch C library.
+ @para agent: Containing L{Agent}
@param dispatch: Pointer to qd_dispatch C object.
@param entity_type: L{EntityType}
- @param attribute: Attribute name:value map
+ @param attributes: Attribute name:value map
+ @param pointer: Pointer to C object that can be used to update attributes.
"""
- super(Entity, self).__init__(entity_type, attributes)
+ super(Entity, self).__init__(entity_type, attributes, validate=validate)
# Direct __dict__ access to avoid validation as schema attributes
self.__dict__['_agent'] = agent
self.__dict__['_qd'] = agent.qd
self.__dict__['_dispatch'] = agent.dispatch
+ def _set_pointer(self, pointer):
+ fname = "qd_c_entity_update_" + self.entity_type.short_name.replace('.', '_')
+ updatefn = self._qd.function(
+ fname, c_long, [py_object, c_void_p])
+ def _do_update():
+ updatefn(self.attributes, pointer);
+ return True
+ self.__dict__['_update'] = _do_update
def create(self, request):
"""Subclasses can add extra create actions here"""
@@ -92,26 +129,26 @@ class Entity(SchemaEntity):
return (OK, self.attributes)
def update(self, request):
+ """Handle update request with new attributes from management client"""
newattrs = dict(self.attributes, **request.body)
self.entity_type.validate(newattrs)
self.attributes = newattrs
return (OK, self.attributes)
def delete(self, request):
- self._agent.delete(self)
+ """Handle delete request from client"""
+ self._agent.remove(self)
return (NO_CONTENT, {})
-class ContainerEntity(Entity):
- def create(self, request):
- self._qd.qd_dispatch_configure_container(self._dispatch, request.body)
+class ContainerEntity(Entity): pass
class RouterEntity(Entity):
- def create(self, request):
- self._qd.qd_dispatch_configure_router(self._dispatch, self)
- self._qd.qd_dispatch_prepare(self._dispatch)
-
+ def __init__(self, *args, **kwargs):
+ kwargs['validate'] = False
+ super(RouterEntity, self).__init__(*args, **kwargs)
+ self._set_pointer(self._dispatch)
class LogEntity(Entity):
def create(self, request):
@@ -146,7 +183,6 @@ class DummyEntity(Entity):
def create(self, request):
self['identity'] = self.next_id()
- return (OK, self.attributes)
def next_id(self): return self.type+str(self.id_count.next())
@@ -154,41 +190,187 @@ class DummyEntity(Entity):
return (OK, dict(**request.properties))
+class CEntity(Entity):
+ """
+ Entity that is registered from C code rather than created via management.
+ """
+ def __init__(self, agent, entity_type, pointer):
+ super(CEntity, self).__init__(agent, entity_type, validate=False)
+ self._set_pointer(pointer)
+ self._update()
+ if not 'identity' in self.attributes:
+ self.attributes['identity'] = "%s:%s" % (entity_type.short_name, self.id_count.next())
+ self.attributes['identity'] = str(self.attributes['identity'])
+ self.validate()
+
+
+class RouterLinkEntity(CEntity):
+ id_count = AtomicCount()
+
+
+class RouterNodeEntity(CEntity):
+ id_count = AtomicCount()
+
+
+class RouterAddressEntity(CEntity):
+ id_count = AtomicCount()
+
+
+class ConnectionEntity(CEntity):
+ id_count = AtomicCount()
+
+
+class AllocatorEntity(CEntity):
+ id_count = AtomicCount()
+
+
+class EntityCache(object):
+ """
+ Searchable cache of entities, can be updated from C attributes.
+ """
+ def __init__(self, agent):
+ self.entities = []
+ self.pointers = {}
+ self.agent = agent
+ self.qd = self.agent.qd
+ self.schema = agent.schema
+
+ def log(self, *args): self.agent.log(*args)
+
+ def map_filter(self, function, test):
+ """Filter with test then apply function."""
+ return map(function, ifilter(test, self.entities))
+
+ def map_type(self, function, type):
+ """Apply function to all entities of type, if type is None do all entities"""
+ if type is None:
+ return map(function, self.entities)
+ else:
+ if isinstance(type, EntityType): type = type.name
+ else: type = self.schema.long_name(type)
+ return map(function, ifilter(lambda e: e.entity_type.name == type, self.entities))
+
+ def add(self, entity, pointer=None):
+ """Add an entity. Provide pointer if it is associated with a C entity"""
+ self.log(LOG_DEBUG, "Add %s entity: %s" %
+ (entity.entity_type.short_name, entity.attributes['identity']))
+ # Validate in the context of the existing entities for uniqueness
+ self.schema.validate_all(chain(iter([entity]), iter(self.entities)))
+ self.entities.append(entity)
+ if pointer: self.pointers[pointer] = entity
+
+ def _remove(self, entity):
+ try:
+ self.entities.remove(entity);
+ self.log(LOG_DEBUG, "Remove %s entity: %s" %
+ (entity.entity_type.short_name, entity.attributes['identity']))
+ except ValueError: pass
+
+ def remove(self, entity):
+ self._remove(entity)
+
+ def remove_pointer(self, pointer):
+ self._remove_pointer()
+
+ def _remove_pointer(self, pointer):
+ if pointer in self.pointers:
+ entity = self.pointers[pointer]
+ del self.pointers[pointer]
+ self._remove(entity)
+
+ def update_from_c(self):
+ """Update entities from the C dispatch runtime"""
+ events = []
+ REMOVE, ADD, REMOVE_ADD = 0, 1, 2
+
+ class Action(object):
+ """Collapse a sequence of add/remove actions down to None, remove, add or remove_add"""
+
+ MATRIX = { # Collaps pairs of actions
+ (None, ADD): ADD,
+ (None, REMOVE): REMOVE,
+ (REMOVE, ADD): REMOVE_ADD,
+ (ADD, REMOVE): None,
+ (REMOVE_ADD, REMOVE): REMOVE
+ }
+
+ def __init__(self, type):
+ self.action = None
+ self.type = type
+
+ def add(self, action):
+ try: self.action = self.MATRIX[(self.action, action)]
+ except KeyError: pass
+
+
+ with self.qd.scoped_dispatch_router_lock(self.agent.dispatch):
+ self.qd.qd_c_entity_flush(events)
+ # Collapse sequences of add/remove into a single remove/add/remove_add per pointer.
+ actions = {}
+ for action, type, pointer in events:
+ if not pointer in actions: actions[pointer] = Action(type)
+ actions[pointer].add(action)
+ for pointer, action in actions.iteritems():
+ if action.action == REMOVE or action.action == REMOVE_ADD:
+ self._remove_pointer(pointer)
+ if action.action == ADD or action.action == REMOVE_ADD:
+ entity_type = self.schema.entity_type(action.type)
+ klass = self.agent.entity_class(entity_type)
+ entity = klass(self.agent, entity_type, pointer)
+ self.add(entity, pointer)
+
+ for e in self.entities: e._update()
+
+
class Agent(object):
"""AMQP managment agent"""
def __init__(self, dispatch, attribute_maps=None):
self.qd = dispatch_c.instance()
self.dispatch = dispatch
- # FIXME aconway 2014-06-26: merge with $management
- self.io = [IoAdapter(self.receive, "$management2"),
- IoAdapter(self.receive, "$management2", True)] # Global
- self.log = LogAdapter("PYAGENT").log # FIXME aconway 2014-09-08: AGENT
self.schema = QdSchema()
- self.entities = [self.create_entity(attributes) for attributes in attribute_maps or []]
+ self.entities = EntityCache(self)
self.name = self.identity = 'self'
self.type = 'org.amqp.management' # AMQP management node type
+ for attributes in attribute_maps or []:
+ self.add_entity(self.create_entity(attributes))
+ self.request_lock = Lock()
+
+ def log(self, *args): pass # Replaced in activate.
+
+ SEP_RE = re.compile(r'-|\.')
+
+ def activate(self, address):
+ """Register the management address to receive management requests"""
+ self.io = [IoAdapter(self.receive, address),
+ IoAdapter(self.receive, address, True)] # Global
+ self.log = LogAdapter("AGENT").log
+
+ def entity_class(self, entity_type):
+ """Return the class that implements entity_type"""
+ class_name = ''.join([n.capitalize() for n in re.split(self.SEP_RE, entity_type.short_name)])
+ class_name += 'Entity'
+ entity_class = globals().get(class_name)
+ if not entity_class:
+ raise InternalServerErrorStatus("Can't find implementation for %s" % entity_type)
+ return entity_class
def create_entity(self, attributes):
"""Create an instance of the implementation class for an entity"""
if 'type' not in attributes:
raise BadRequestStatus("No 'type' attribute in %s" % attributes)
entity_type = self.schema.entity_type(attributes['type'])
- class_name = ''.join([n.capitalize() for n in entity_type.short_name.split('-')])
- class_name += 'Entity'
- entity_class = globals().get(class_name)
- if not entity_class:
- raise InternalServerErrorStatus("Can't find implementation for %s" % entity_type)
- return entity_class(self, entity_type, attributes)
+ return self.entity_class(entity_type)(self, entity_type, attributes)
def respond(self, request, status=OK, description=None, body=None):
"""Send a response to the client"""
+ if body is None: body = {}
description = description or STATUS_TEXT[status]
response = Message(
address=request.reply_to,
correlation_id=request.correlation_id,
properties={'statusCode': status, 'statusDescription': description},
- body=body or {})
+ body=body)
self.log(LOG_DEBUG, "Agent response:\n %s\n Responding to: \n %s"%(response, request))
try:
self.io[0].send(response)
@@ -197,20 +379,27 @@ class Agent(object):
def receive(self, request, link_id):
"""Called when a management request is received."""
- self.log(LOG_DEBUG, "Agent request %s on link %s"%(request, link_id))
- def error(e, trace):
- """Raise an error"""
- self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, trace))
- self.respond(request, e.status, e.description)
- try:
- status, body = self.handle(request)
- self.respond(request, status=status, body=body)
- except ManagementError, e:
- error(e, format_exc())
- except ValidationError, e:
- error(BadRequestStatus(str(e)), format_exc())
- except Exception, e:
- error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, e)), format_exc())
+ # Coarse locking, handle one request at a time.
+ with self.request_lock:
+ self.entities.update_from_c()
+ self.log(LOG_DEBUG, "Agent request %s on link %s"%(request, link_id))
+ def error(e, trace):
+ """Raise an error"""
+ self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, trace))
+ self.respond(request, e.status, e.description)
+ try:
+ status, body = self.handle(request)
+ self.respond(request, status=status, body=body)
+ except ManagementError, e:
+ error(e, format_exc())
+ except ValidationError, e:
+ error(BadRequestStatus(str(e)), format_exc())
+ except Exception, e:
+ error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, e)), format_exc())
+
+ def entity_type(self, type):
+ try: return self.schema.entity_type(type)
+ except ValidationError, e: raise NotFoundStatus(str(e))
def handle(self, request):
"""
@@ -219,39 +408,41 @@ class Agent(object):
@return: (response-code, body)
"""
operation = required_property('operation', request)
- type = request.properties.get('type')
+ type = request.properties.get('type') # Allow absent type for requests with a name.
if type == self.type or operation.lower() == 'create':
+ # Create requests are entity requests but must be handled by the agent since
+ # the entity does not yet exist.
target = self
else:
target = self.find_entity(request)
target.entity_type.allowed(operation)
try:
- method = getattr(target, operation.lower())
+ method = getattr(target, operation.lower().replace("-", "_"))
except AttributeError:
not_implemented(operation, target.type)
return method(request)
+ def requested_type(self, request):
+ type = request.properties.get('entityType')
+ if type: return self.entity_type(type)
+ else: return None
+
def query(self, request):
"""Management node query operation"""
- entity_type = request.properties.get('entityType')
- if entity_type:
- try:
- entity_type = self.schema.entity_type(entity_type)
- except:
- raise NotFoundStatus("Unknown entity type '%s'" % entity_type)
+ type = self.requested_type(request)
attribute_names = request.body.get('attributeNames')
if not attribute_names:
- if entity_type:
- attribute_names = entity_type.attributes.keys()
+ if type:
+ attribute_names = type.attributes.keys()
else: # Every attribute in the schema!
names = set()
for e in self.schema.entity_types.itervalues():
names.update(e.attributes.keys())
attribute_names = list(names)
- results = [[e.attributes.get(a) for a in attribute_names]
- for e in self.entities
- if not entity_type or e.type == entity_type.name]
+ attributes = self.entities.map_type(lambda e: e.attributes, type)
+ results = [[attrs.get(name) for name in attribute_names]
+ for attrs in attributes]
return (OK, {'attributeNames': attribute_names, 'results': results})
def create(self, request):
@@ -268,12 +459,39 @@ class Agent(object):
attributes[a] = value
entity = self.create_entity(attributes)
entity.entity_type.allowed('create')
- # Validate in the context of the existing entities for uniqueness
- self.schema.validate_all([entity]+self.entities)
entity.create(request) # Send the create request to the entity
- self.entities.append(entity)
+ self.add_entity(entity)
return (CREATED, entity.attributes)
+ def add_entity(self, entity): self.entities.add(entity)
+
+ def remove(self, entity): self.entities.remove(entity)
+
+ def get_types(self, request):
+ type = self.requested_type(request)
+ return (OK, dict((t, []) for t in self.schema.entity_types
+ if not type or type.name == t))
+
+ def get_operations(self, request):
+ type = self.requested_type(request)
+ return (OK, dict((t, et.operations)
+ for t, et in self.schema.entity_types.iteritems()
+ if not type or type.name == t))
+
+ def get_attributes(self, request):
+ type = self.requested_type(request)
+ return (OK, dict((t, [a for a in et.attributes])
+ for t, et in self.schema.entity_types.iteritems()
+ if not type or type.name == t))
+
+ def get_mgmt_nodes(self, request):
+ router = self.entities.map_type(None, 'router')[0]
+ area = router.attributes['area']
+ def node_address(node):
+ return "amqp:/_topo/%s/%s/$management" % (area, node.attributes['addr'][1:])
+ return (OK, self.entities.map_type(node_address, 'router.node'))
+
+
def find_entity(self, request):
"""Find the entity addressed by request"""
@@ -287,7 +505,7 @@ class Agent(object):
return " ".join(["%s=%r" % (k, v) for k, v in ids.iteritems()])
k, v = ids.iteritems().next() # Get the first id attribute
- found = [e for e in self.entities if e.attributes.get(k) == v]
+ found = self.entities.map_filter(None, lambda e: e.attributes.get(k) == v)
if len(found) == 1:
entity = found[0]
elif len(found) > 1:
@@ -299,13 +517,12 @@ class Agent(object):
for k, v in ids.iteritems():
if entity[k] != v: raise BadRequestStatus("Conflicting %s" % attrvals())
+ request_type = request.properties.get('type')
+ if request_type and not entity.entity_type.name_is(request_type):
+ raise NotFoundStatus("Entity type '%s' does match requested type '%s'" %
+ (entity.entity_type.name, request_type))
+
return entity
def find_entity_by_type(self, type):
- return [e for e in self.entities if e.entity_type.name == type]
-
- def delete(self, entity):
- try:
- self.entities.remove(entity)
- except ValueError:
- raise NotFoundStatus("Cannot delete, entity not found: %s"%entity)
+ return self.entities.map_type(None, type)
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py Fri Oct 17 23:35:35 2014
@@ -98,7 +98,7 @@ class Config(object):
elif 'identity' in attrs:
attrs['name'] = attrs['identity']
else:
- identity = "%s%d"%(section_name, count)
+ identity = "%s:%d"%(section_name, count)
attrs['name'] = attrs['identity'] = identity
return content
@@ -137,6 +137,12 @@ def configure_dispatch(dispatch, filenam
qd = dispatch_c.instance()
dispatch = qd.qd_dispatch_p(dispatch)
config = Config(filename)
+
+ # NOTE: Can't import agent till till dispatch C extension module is initialized.
+ from .agent import Agent
+ agent = Agent(dispatch, config.entities)
+ qd.qd_dispatch_set_agent(dispatch, agent)
+
# Configure any DEFAULT log entities first so we can report errors in non-
# default log configurations to the correct place.
for l in config.by_type('log'):
@@ -147,10 +153,8 @@ def configure_dispatch(dispatch, filenam
qd.qd_dispatch_configure_router(dispatch, config.by_type('router').next())
qd.qd_dispatch_prepare(dispatch)
- # NOTE: Can't import agent till after qd_dispatch_prepare
- from .agent import Agent
- qd.qd_dispatch_set_agent(dispatch, Agent(dispatch, config.entities))
- qd.qd_router_setup_late(dispatch); # Must come after qd_dispatch_set_agent
+ agent.activate("$management")
+ qd.qd_router_setup_late(dispatch);
# Note must configure addresses, waypoints, listeners and connectors after qd_dispatch_prepare
for a in config.by_type('fixed-address'): qd.qd_dispatch_configure_address(dispatch, a)
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py Fri Oct 17 23:35:35 2014
@@ -22,6 +22,7 @@ Qpid Dispatch Router management schema a
"""
import json
+from pkgutil import get_data
from . import schema
from ..compat import json_load_kwargs
@@ -29,12 +30,10 @@ class QdSchema(schema.Schema):
"""
Qpid Dispatch Router management schema.
"""
- SCHEMA_FILE = schema.schema_file("qdrouter.json")
-
def __init__(self):
"""Load schema."""
- with open(self.SCHEMA_FILE) as f:
- super(QdSchema, self).__init__(**json.load(f, **json_load_kwargs))
+ schema = get_data('qpid_dispatch.management', 'qdrouter.json')
+ super(QdSchema, self).__init__(**json.loads(schema, **json_load_kwargs))
def validate(self, entities, full=True, **kwargs):
"""
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py Fri Oct 17 23:35:35 2014
@@ -26,7 +26,7 @@ check for uniqueness of enties/attribute
A Schema can be loaded/dumped to a json file.
"""
-import os, sys
+import sys
from qpid_dispatch.management import entity
from qpid_dispatch.management.error import ForbiddenStatus
from ..compat import OrderedDict
@@ -36,11 +36,6 @@ class ValidationError(Exception):
pass
-def schema_file(name):
- """Return a file name relative to the directory from which this module was loaded."""
- return os.path.join(os.path.dirname(__file__), name)
-
-
class Type(object):
"""Base class for schema types.
@@ -73,6 +68,7 @@ class Type(object):
"""String name of type."""
return str(self.dump())
+
class BooleanType(Type):
"""A boolean schema type"""
@@ -95,6 +91,7 @@ class BooleanType(Type):
except:
raise ValidationError("Invalid Boolean value '%r'"%value)
+
class EnumValue(str):
"""A string that convets to an integer value via int()"""
@@ -108,6 +105,7 @@ class EnumValue(str):
def __ne__(self, x): return not self == x
def __repr__(self): return "EnumValue('%s', %s)"%(str(self), int(self))
+
class EnumType(Type):
"""An enumerated type"""
@@ -146,7 +144,8 @@ class EnumType(Type):
"""String description of enum type."""
return "One of [%s]"%(', '.join(self.tags))
-BUILTIN_TYPES = dict((t.name, t) for t in [Type("String", str), Type("Integer", int), BooleanType()])
+BUILTIN_TYPES = dict((t.name, t) for t in
+ [Type("String", str), Type("Integer", int), Type("List", list), BooleanType()])
def get_type(rep):
"""
@@ -313,7 +312,7 @@ class EntityType(AttributeTypeHolder):
#ivar include: List of names of sections included by this entity.
"""
def __init__(self, name, schema, singleton=False, include=None, attributes=None,
- description="", allows=""):
+ description="", operations=None):
"""
@param name: name of the entity type.
@param schema: schema for this type.
@@ -321,14 +320,14 @@ class EntityType(AttributeTypeHolder):
@param include: List of names of include types for this entity.
@param attributes: Map of attributes {name: {type:, default:, required:, unique:}}
@param description: Human readable description.
- @param allows: Allowed operations, string of "CRUD"
+ @param operations: Allowed operations, list of operation names.
"""
super(EntityType, self).__init__(name, schema, attributes, description)
self.short_name = schema.short_name(name)
self.refs = {'entity-type': name}
self.singleton = singleton
self.include = include
- self.allows = allows.upper()
+ self.operations = operations or []
if include and self.schema.includes:
for i in include:
if not i in schema.includes:
@@ -399,16 +398,18 @@ class EntityType(AttributeTypeHolder):
return attributes
- def allowed(self, operation):
+ def allowed(self, op):
"""Raise excepiton if op is not a valid operation on entity."""
- op = operation.upper()
- if op[0] not in self.allows:
+ op = op.upper()
+ if op not in self.operations:
raise ForbiddenStatus("Operation '%s' not allowed for '%s'" % (op, self.name))
def __repr__(self): return "%s(%s)" % (type(self).__name__, self.name)
def __str__(self): return self.name
+ def name_is(self, name):
+ return self.name == self.schema.long_name(name)
class Schema(object):
"""
@@ -448,7 +449,7 @@ class Schema(object):
def long_name(self, name):
"""Add prefix to unqualified name"""
if not name: return name
- if not '.' in name:
+ if not name.startswith(self.prefixdot):
name = self.prefixdot + name
return name
@@ -463,12 +464,15 @@ class Schema(object):
for e in self.entity_types.itervalues()))
])
- def entity_type(self, name):
- """Look up an EntityType by name"""
+ def entity_type(self, name, error=True):
+ """Look up an EntityType by name.
+ If error raise exception if not found else return None
+ """
try:
return self.entity_types[self.long_name(name)]
except KeyError:
- raise ValidationError("No such entity_type %r" % name)
+ if error: raise ValidationError("No such entity type '%s'" % name)
+ return None
def validate_entity(self, attributes, check_required=True, add_default=True,
check_unique=None, check_singleton=None):
@@ -514,7 +518,6 @@ class Schema(object):
add_default=add_default,
check_unique=check_unique,
check_singleton=check_singleton)
- return attribute_maps
def entity(self, attributes):
"""Convert an attribute map into an L{Entity}"""
@@ -528,10 +531,11 @@ class Schema(object):
class Entity(entity.Entity):
"""A map of attributes associated with an L{EntityType}"""
- def __init__(self, entity_type, attributes=None, **kwattrs):
+ def __init__(self, entity_type, attributes=None, validate=True, **kwattrs):
super(Entity, self).__init__(attributes, **kwattrs)
self.__dict__['entity_type'] = entity_type
- self.validate()
+ self.attributes.setdefault('type', entity_type.name)
+ if validate: self.validate()
def __setitem__(self, name, value):
super(Entity, self).__setitem__(name, value)
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py Fri Oct 17 23:35:35 2014
@@ -25,7 +25,7 @@ class Configuration(object):
##
## Load default values
##
- self.values =
+ self.values = {}
##
## Apply supplied overrides
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py Fri Oct 17 23:35:35 2014
@@ -83,10 +83,8 @@ class RouterEngine:
@property
def config(self):
if not self._config:
- router_type = 'org.apache.qpid.dispatch.router'
- routers = self.router_adapter.get_agent().find_entity_by_type(router_type)
- if not routers: raise ValueError("No router configuration found")
- self._config = routers[0]
+ try: self._config = self.router_adapter.get_agent().find_entity_by_type('router')[0]
+ except IndexError: raise ValueError("No router configuration found")
return self._config
def addressAdded(self, addr):
Modified: qpid/dispatch/trunk/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/CMakeLists.txt?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/src/CMakeLists.txt Fri Oct 17 23:35:35 2014
@@ -26,7 +26,7 @@ set(GENERATED_SOURCES
file (GLOB_RECURSE GENERATOR_DEPENDS
${CMAKE_CURRENT_SOURCE_DIR}/schema_c.py
${CMAKE_SOURCE_DIR}/python/qpid_router_internal/management/*.py
- ${CMAKE_SOURCE_DIR}/python/qpid_router_internal/management/qdrouterd.json)
+ ${CMAKE_SOURCE_DIR}/python/qpid_router/management/qdrouterd.json)
add_custom_command (
OUTPUT ${GENERATED_SOURCES}
@@ -48,6 +48,7 @@ set(qpid_dispatch_SOURCES
container.c
dispatch.c
entity.c
+ c_entity.c
hash.c
iovec.c
iterator.c
Modified: qpid/dispatch/trunk/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/agent.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/agent.c (original)
+++ qpid/dispatch/trunk/src/agent.c Fri Oct 17 23:35:35 2014
@@ -85,7 +85,7 @@ typedef enum {
// Convenience for logging, expects agent to be defined.
#define LOG(LEVEL, MSG, ...) qd_log(agent->log_source, QD_LOG_##LEVEL, MSG, ##__VA_ARGS__)
-static const char *AGENT_ADDRESS = "$management";
+static const char *AGENT_ADDRESS = "$cmanagement";
static const char *STATUS_CODE = "statusCode";
static const char *STATUS_DESCRIPTION = "statusDescription";
static const char *AP_ENTITY_TYPE = "entityType";
@@ -763,7 +763,7 @@ static qd_agent_class_t *qd_agent_regist
qd_agent_t *qd_agent(qd_dispatch_t *qd)
{
qd_agent_t *agent = NEW(qd_agent_t);
- agent->log_source = qd_log_source("AGENT");
+ agent->log_source = qd_log_source("CAGENT");
agent->qd = qd;
agent->class_hash = qd_hash(6, 10, 1);
DEQ_INIT(agent->class_list);
Modified: qpid/dispatch/trunk/src/alloc.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/alloc.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/alloc.c (original)
+++ qpid/dispatch/trunk/src/alloc.c Fri Oct 17 23:35:35 2014
@@ -17,6 +17,7 @@
* under the License.
*/
+#include <Python.h>
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/log.h>
@@ -24,6 +25,8 @@
#include <memory.h>
#include <inttypes.h>
#include <stdio.h>
+#include "entity_private.h"
+#include "c_entity.h"
#define QD_MEMORY_DEBUG 1
@@ -95,6 +98,7 @@ static void qd_alloc_init(qd_alloc_type_
desc->header = PATTERN_FRONT;
desc->trailer = PATTERN_BACK;
+ qd_c_entity_add(QD_ALLOCATOR_TYPE, type_item);
}
sys_mutex_unlock(init_lock);
@@ -284,6 +288,7 @@ void qd_alloc_finalize(void)
qd_alloc_item_t *item;
qd_alloc_type_t *type_item = DEQ_HEAD(type_list);
while (type_item) {
+ qd_c_entity_remove(QD_ALLOCATOR_TYPE, type_item);
qd_alloc_type_desc_t *desc = type_item->desc;
//
@@ -342,10 +347,28 @@ void qd_alloc_finalize(void)
}
+qd_error_t qd_c_entity_update_allocator(qd_entity_t* entity, void *impl) {
+ qd_alloc_type_t *alloc_type = (qd_alloc_type_t*) impl;
+ if ((qd_entity_has(entity, "identity") ||
+ qd_entity_set_string(entity, "identity", alloc_type->desc->type_name) == 0) &&
+ qd_entity_set_long(entity, "type_size", alloc_type->desc->total_size) == 0 &&
+ qd_entity_set_long(entity, "transfer_batch_size", alloc_type->desc->config->transfer_batch_size) == 0 &&
+ qd_entity_set_long(entity, "local_free_list_max", alloc_type->desc->config->local_free_list_max) == 0 &&
+ qd_entity_set_long(entity, "global_free_list_max", alloc_type->desc->config->global_free_list_max) == 0 &&
+ qd_entity_set_long(entity, "total_alloc_from_heap", alloc_type->desc->stats->total_alloc_from_heap) == 0 &&
+ qd_entity_set_long(entity, "total_free_to_heap", alloc_type->desc->stats->total_free_to_heap) == 0 &&
+ qd_entity_set_long(entity, "held_by_threads", alloc_type->desc->stats->held_by_threads) == 0 &&
+ qd_entity_set_long(entity, "batches_rebalanced_to_threads", alloc_type->desc->stats->batches_rebalanced_to_threads) == 0 &&
+ qd_entity_set_long(entity, "batches_rebalanced_to_global", alloc_type->desc->stats->batches_rebalanced_to_global) == 0)
+ return QD_ERROR_NONE;
+ return qd_error_code();
+}
+
+
static void alloc_attr_name(void *object_handle, void *cor, void *unused)
{
- qd_alloc_type_t *item = (qd_alloc_type_t*) object_handle;
- qd_agent_value_string(cor, 0, item->desc->type_name);
+ qd_alloc_type_t *alloc_type = (qd_alloc_type_t*) object_handle;
+ qd_agent_value_string(cor, 0, alloc_type->desc->type_name);
}
@@ -412,7 +435,6 @@ static void alloc_attr_batches_rebalance
}
-static const char *ALLOC_TYPE = "org.apache.qpid.dispatch.allocator";
static const qd_agent_attribute_t ALLOC_ATTRIBUTES[] =
{{"name", alloc_attr_name, 0},
{"identity", alloc_attr_name, 0},
@@ -442,6 +464,14 @@ static void alloc_query_handler(void* co
void qd_alloc_setup_agent(qd_dispatch_t *qd)
{
- qd_agent_register_class(qd, ALLOC_TYPE, 0, ALLOC_ATTRIBUTES, alloc_query_handler);
+ qd_agent_register_class(qd, QD_ALLOCATOR_TYPE, 0, ALLOC_ATTRIBUTES, alloc_query_handler);
}
+
+// Entity add/remove event cache.
+
+
+struct event {
+ const char *type; /* Set for an add event, NULL for a remove event */
+ void *pointer;
+};
Added: qpid/dispatch/trunk/src/aprintf.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/aprintf.h?rev=1632696&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/aprintf.h (added)
+++ qpid/dispatch/trunk/src/aprintf.h Fri Oct 17 23:35:35 2014
@@ -0,0 +1,69 @@
+#ifndef BPRINTF_H
+#define BPRINTF_H
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <errno.h>
+#include <assert.h>
+
+/**
+ Variadic appending printf - see aprintf()
+ */
+static int vaprintf(char **begin, char *end, const char *format, va_list ap_in) {
+ int size = end - *begin;
+ if (size == 0) return EINVAL;
+ va_list ap;
+ va_copy(ap, ap_in);
+ int n = vsnprintf(*begin, size, format, ap);
+ va_end(ap);
+ if (n < 0) return n;
+ if (n >= size) {
+ *begin = end-1;
+ assert(**begin == '\0');
+ return n;
+ }
+ *begin += n;
+ assert(*begin < end);
+ assert(**begin == '\0');
+ return 0;
+}
+
+/**
+ Appending printf.
+
+ Print to buffer at *begin with null terminatr, do not go beyond end.
+ Advance *begin to point to the null terminator.
+. Return value:
+ - 0 on success: advance *begin to the null terminator.
+ - n > 0: printing was truncated and would have printed n characters. *begin == end-1
+ - n < 0: error (return value of vsnprintf) no change to *begin
+ */
+static int aprintf(char **begin, char *end, const char *format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ int n = vaprintf(begin, end, format, ap);
+ va_end(ap);
+ return n;
+}
+
+
+
+#endif
Added: qpid/dispatch/trunk/src/c_entity.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/c_entity.c?rev=1632696&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/c_entity.c (added)
+++ qpid/dispatch/trunk/src/c_entity.c Fri Oct 17 23:35:35 2014
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/python_embedded.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/ctools.h>
+#include <structmember.h>
+#include "c_entity.h"
+#include "entity_private.h"
+#include "dispatch_private.h"
+#include "router_private.h"
+
+#include <stdbool.h>
+#include <pthread.h>
+
+
+typedef enum { REMOVE=0, ADD=1 } action_t;
+
+typedef struct entity_event_t {
+ DEQ_LINKS(struct entity_event_t);
+ action_t action;
+ const char *type;
+ void *object;
+} entity_event_t;
+
+DEQ_DECLARE(entity_event_t, entity_event_list_t);
+
+static entity_event_t *entity_event(action_t action, const char *type, void *object) {
+ entity_event_t *event = NEW(entity_event_t);
+ DEQ_ITEM_INIT(event);
+ event->action = action;
+ event->type = type;
+ event->object = object;
+ return event;
+}
+
+static sys_mutex_t *event_lock = 0;
+static entity_event_list_t event_list;
+
+void qd_c_entity_initialize(void) {
+ event_lock = sys_mutex();
+ DEQ_INIT(event_list);
+}
+
+static void push_event(action_t action, const char *type, void *object) {
+ if (!event_lock) return; /* Unit tests don't call qd_c_entity_initialize */
+ sys_mutex_lock(event_lock);
+ entity_event_t *event = entity_event(action, type, object);
+ DEQ_INSERT_TAIL(event_list, event);
+ sys_mutex_unlock(event_lock);
+}
+
+void qd_c_entity_add(const char *type, void *object) { push_event(ADD, type, object); }
+
+void qd_c_entity_remove(const char *type, void *object) { push_event(REMOVE, type, object); }
+
+// Flush events in the add/remove cache into a python list of (action, type, pointer)
+qd_error_t qd_c_entity_flush(PyObject *list) {
+ if (!event_lock) return QD_ERROR_NONE; /* Unit tests don't call qd_c_entity_initialize */
+ qd_error_clear();
+ sys_mutex_lock(event_lock);
+ fprintf(stderr, "qd_c_entity_flush %d\n", (int)DEQ_SIZE(event_list));
+ entity_event_t *event = DEQ_HEAD(event_list);
+ while (event) {
+ PyObject *tuple = Py_BuildValue("(isl)", (int)event->action, event->type, (long)event->object);
+ if (!tuple) { fprintf(stderr, "No tuple"); qd_error_py(); break; }
+ int err = PyList_Append(list, tuple);
+ Py_DECREF(tuple);
+ if (err) { fprintf(stderr, "No tuple"); qd_error_py(); break; }
+ DEQ_REMOVE_HEAD(event_list);
+ free(event);
+ event = DEQ_HEAD(event_list);
+ }
+ sys_mutex_unlock(event_lock);
+ return qd_error_code();
+}
+
+const char *QD_ALLOCATOR_TYPE = "allocator";
+const char *QD_CONNECTION_TYPE = "connection";
+const char *QD_ROUTER_TYPE = "router";
+const char *QD_ROUTER_NODE_TYPE = "router.node";
+const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
+const char *QD_ROUTER_LINK_TYPE = "router.link";
Copied: qpid/dispatch/trunk/src/c_entity.h (from r1632169, qpid/dispatch/trunk/src/static_assert.h)
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/c_entity.h?p2=qpid/dispatch/trunk/src/c_entity.h&p1=qpid/dispatch/trunk/src/static_assert.h&r1=1632169&r2=1632696&rev=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/static_assert.h (original)
+++ qpid/dispatch/trunk/src/c_entity.h Fri Oct 17 23:35:35 2014
@@ -1,6 +1,5 @@
-#ifndef STATIC_ASSERT_H
-#define STATIC_ASSERT_H
-
+#ifndef C_ENTITY_H
+#define C_ENTITY_H 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -21,21 +20,28 @@
*/
/** @file
- * STATIC_ASSERT allows you to do compile time assertions at file scope or in a function.
- * @param expr: a boolean expression that is valid at compile time.
- * @param msg: a "message" that must also be a valid identifier, i.e. message_with_underscores
+ *
+ * Python-C interface for managed entity implementation objects.
+ *
+ * Maintain a cache of entity add and remove events that can be read by the
+ * python agent.
*/
+/** Initialize the module. */
+void qd_c_entity_initialize(void);
+
+/** Record an entity add event. */
+void qd_c_entity_add(const char *type, void *object);
+
+/** Record an entity remove event. */
+void qd_c_entity_remove(const char *type, void *object);
-#ifdef __GNUC__
-#define STATIC_ASSERT_HELPER(expr, msg) \
- (!!sizeof(struct { unsigned int STATIC_ASSERTION__##msg: (expr) ? 1 : -1; }))
-#define STATIC_ASSERT(expr, msg) \
- extern int (*assert_function__(void)) [STATIC_ASSERT_HELPER(expr, msg)]
-#else
- #define STATIC_ASSERT(expr, msg) \
- extern char STATIC_ASSERTION__##msg[1]; \
- extern char STATIC_ASSERTION__##msg[(expr)?1:2]
-#endif /* #ifdef __GNUC__ */
+/** Entity type strings */
+extern const char *QD_ALLOCATOR_TYPE;
+extern const char *QD_CONNECTION_TYPE;
+extern const char *QD_ROUTER_TYPE;
+extern const char *QD_ROUTER_NODE_TYPE;
+extern const char *QD_ROUTER_ADDRESS_TYPE;
+extern const char *QD_ROUTER_LINK_TYPE;
-#endif // STATIC_ASSERT_H
+#endif
Modified: qpid/dispatch/trunk/src/connection_manager.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager.c (original)
+++ qpid/dispatch/trunk/src/connection_manager.c Fri Oct 17 23:35:35 2014
@@ -23,6 +23,7 @@
#include "dispatch_private.h"
#include "server_private.h"
#include "entity_private.h"
+#include "c_entity.h"
#include "schema_enum.h"
#include <string.h>
@@ -337,7 +338,6 @@ static void cm_attr_dir(void *object_han
}
-static const char *CONN_TYPE = "org.apache.qpid.dispatch.connection";
static const qd_agent_attribute_t CONN_ATTRIBUTES[] =
{{"name", cm_attr_name, 0},
{"identity", cm_attr_name, 0},
@@ -349,7 +349,6 @@ static const qd_agent_attribute_t CONN_A
{"dir", cm_attr_dir, 0},
{0, 0, 0}};
-
static void server_query_handler(void* context, void *cor)
{
qd_dispatch_t *qd = (qd_dispatch_t*) context;
@@ -365,10 +364,9 @@ static void server_query_handler(void* c
sys_mutex_unlock(qd_server->lock);
}
-
void qd_connection_manager_setup_agent(qd_dispatch_t *qd)
{
- qd_agent_register_class(qd, CONN_TYPE, qd, CONN_ATTRIBUTES, server_query_handler);
+ qd_agent_register_class(qd, QD_CONNECTION_TYPE, qd, CONN_ATTRIBUTES, server_query_handler);
}
Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Fri Oct 17 23:35:35 2014
@@ -440,9 +440,9 @@ qd_container_t *qd_container(qd_dispatch
DEQ_INIT(container->nodes);
DEQ_INIT(container->node_type_list);
- qd_log(container->log_source, QD_LOG_TRACE, "Container Initializing");
qd_server_set_conn_handler(qd, handler, container);
+ qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized");
return container;
}
Modified: qpid/dispatch/trunk/src/dispatch.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch.c (original)
+++ qpid/dispatch/trunk/src/dispatch.c Fri Oct 17 23:35:35 2014
@@ -22,6 +22,8 @@
#include <qpid/dispatch.h>
#include <qpid/dispatch/server.h>
#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/static_assert.h>
+
#include "dispatch_private.h"
#include "alloc_private.h"
#include "log_private.h"
@@ -29,12 +31,12 @@
#include "waypoint_private.h"
#include "message_private.h"
#include "entity_private.h"
-#include "static_assert.h"
+#include "c_entity.h"
/**
* Private Function Prototypes
*/
-qd_server_t *qd_server(int tc, const char *container_name);
+qd_server_t *qd_server(qd_dispatch_t *qd, int tc, const char *container_name);
void qd_connection_manager_setup_agent(qd_dispatch_t *qd);
void qd_server_free(qd_server_t *server);
qd_container_t *qd_container(qd_dispatch_t *qd);
@@ -49,11 +51,10 @@ void qd_error_initialize();
qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
{
- qd_error_clear();
qd_dispatch_t *qd = NEW(qd_dispatch_t);
memset(qd, 0, sizeof(qd_dispatch_t));
- // alloc, log and error have to be initialized before any module.
+ qd_c_entity_initialize(); /* Must be first */
qd_alloc_initialize();
qd_log_initialize();
qd_error_initialize();
@@ -67,6 +68,7 @@ qd_dispatch_t *qd_dispatch(const char *p
if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
qd_message_initialize();
if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
+ qd->log_source = qd_log_source("DISPATCH");
return qd;
}
@@ -76,15 +78,15 @@ STATIC_ASSERT(sizeof(long) >= sizeof(voi
qd_error_t qd_dispatch_load_config(qd_dispatch_t *qd, const char *config_path)
{
- PyObject *module=0, *configure_dispatch=0, *result=0;
- bool ok =
- (module = PyImport_ImportModule("qpid_dispatch_internal.management.config")) &&
- (configure_dispatch = PyObject_GetAttrString(module, "configure_dispatch")) &&
- (result = PyObject_CallFunction(configure_dispatch, "(ls)", (long)qd, config_path));
- Py_XDECREF(module);
- Py_XDECREF(configure_dispatch);
- Py_XDECREF(result);
- return ok ? QD_ERROR_NONE : qd_error_py();
+ PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.management.config");
+ if (!module) return qd_error_py();
+ PyObject *configure_dispatch = PyObject_GetAttrString(module, "configure_dispatch");
+ Py_DECREF(module);
+ if (!configure_dispatch) return qd_error_py();
+ PyObject *result = PyObject_CallFunction(configure_dispatch, "(ls)", (long)qd, config_path);
+ Py_DECREF(configure_dispatch);
+ if (!result) return qd_error_py();
+ return QD_ERROR_NONE;
}
@@ -121,7 +123,7 @@ qd_error_t qd_dispatch_configure_waypoin
qd_error_t qd_dispatch_prepare(qd_dispatch_t *qd)
{
- qd->server = qd_server(qd->thread_count, qd->container_name);
+ qd->server = qd_server(qd, qd->thread_count, qd->container_name);
qd->container = qd_container(qd);
qd->router = qd_router(qd, qd->router_mode, qd->router_area, qd->router_id);
qd->agent = qd_agent(qd);
@@ -155,3 +157,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
qd_alloc_finalize();
qd_python_finalize();
}
+
+
+void qd_dispatch_router_lock(qd_dispatch_t *qd) { sys_mutex_lock(qd->router->lock); }
+void qd_dispatch_router_unlock(qd_dispatch_t *qd) { sys_mutex_unlock(qd->router->lock); }
Modified: qpid/dispatch/trunk/src/dispatch_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch_private.h?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch_private.h (original)
+++ qpid/dispatch/trunk/src/dispatch_private.h Fri Oct 17 23:35:35 2014
@@ -55,6 +55,8 @@ struct qd_dispatch_t {
char *router_area;
char *router_id;
qd_router_mode_t router_mode;
+
+ qd_log_source_t *log_source;
};
void qd_dispatch_set_agent(qd_dispatch_t *qd, void *agent);
Modified: qpid/dispatch/trunk/src/entity.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/entity.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/entity.c (original)
+++ qpid/dispatch/trunk/src/entity.c Fri Oct 17 23:35:35 2014
@@ -50,12 +50,12 @@ char *qd_entity_string(qd_entity_t *enti
qd_error_clear();
PyObject *py_obj = qd_entity_get_py(entity, attribute);
PyObject *py_str = py_obj ? PyObject_Str(py_obj) : NULL;
- char* str = py_str ? PyString_AsString(py_str) : NULL;
+ const char *cstr = py_str ? PyString_AsString(py_str) : NULL;
+ char* str = cstr ? strdup(cstr) : NULL;
Py_XDECREF(py_obj);
Py_XDECREF(py_str);
- if (str) return strdup(str);
- qd_error_py();
- return NULL;
+ if (!str) qd_error_py();
+ return str;
}
long qd_entity_long(qd_entity_t *entity, const char* attribute) {
@@ -102,3 +102,83 @@ bool qd_entity_opt_bool(qd_entity_t *ent
}
return default_value;
}
+
+
+/**
+ * Set a value for an entity attribute. If py_value == NULL then clear the attribute.
+ * If the attribute exists and is a list, append this value to the list.
+ *
+ * NOTE: This function will Py_XDECREF(py_value).
+ */
+qd_error_t qd_entity_set_py(qd_entity_t* entity, const char* attribute, PyObject* py_value) {
+ qd_error_clear();
+
+ int result = 0;
+ PyObject *py_key = PyString_FromString(attribute);
+ if (py_key) {
+ if (py_value == NULL) { /* Delete the attribute */
+ result = PyObject_DelItem((PyObject*)entity, py_key);
+ PyErr_Clear(); /* Ignore error if it isn't there. */
+ }
+ else {
+ PyObject *old = PyObject_GetItem((PyObject*)entity, py_key);
+ PyErr_Clear(); /* Ignore error if it isn't there. */
+ if (old && PyList_Check(old)) /* Add to list */
+ result = PyList_Append(old, py_value);
+ else /* Set attribute */
+ result = PyObject_SetItem((PyObject*)entity, py_key, py_value);
+ Py_XDECREF(old);
+ }
+ }
+ Py_XDECREF(py_key);
+ Py_XDECREF(py_value);
+ return (py_key == NULL || result < 0) ? qd_error_py() : QD_ERROR_NONE;
+}
+
+qd_error_t qd_entity_set_string(qd_entity_t *entity, const char* attribute, const char *value) {
+ return qd_entity_set_py(entity, attribute, value ? PyString_FromString(value) : 0);
+}
+
+qd_error_t qd_entity_set_longp(qd_entity_t *entity, const char* attribute, const long *value) {
+ return qd_entity_set_py(entity, attribute, value ? PyInt_FromLong(*value) : 0);
+}
+
+qd_error_t qd_entity_set_boolp(qd_entity_t *entity, const char *attribute, const bool *value) {
+ return qd_entity_set_py(entity, attribute, value ? PyBool_FromLong(*value) : 0);
+}
+
+qd_error_t qd_entity_set_long(qd_entity_t *entity, const char* attribute, long value) {
+ return qd_entity_set_longp(entity, attribute, &value);
+}
+
+qd_error_t qd_entity_set_bool(qd_entity_t *entity, const char *attribute, bool value) {
+ return qd_entity_set_boolp(entity, attribute, &value);
+}
+
+qd_error_t qd_entity_clear(qd_entity_t *entity, const char *attribute) {
+ return qd_entity_set_py(entity, attribute, 0);
+}
+
+#define CHECK(err) if (err) return qd_error_code()
+
+qd_error_t qd_entity_set_list(qd_entity_t *entity, const char *attribute) {
+ CHECK(qd_entity_clear(entity, attribute));
+ return qd_entity_set_py(entity, attribute, PyList_New(0));
+}
+
+qd_error_t qd_entity_set_stringf(qd_entity_t *entity, const char* attribute, const char *format, ...)
+{
+ // Calculate the size
+ char dummy[1];
+ va_list ap;
+ va_start(ap, format);
+ int len = vsnprintf(dummy, 1, format, ap);
+ va_end(ap);
+
+ char buf[len+1];
+ va_start(ap, format);
+ vsnprintf(buf, len+1, format, ap);
+ va_end(ap);
+
+ return qd_entity_set_string(entity, attribute, buf);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org