You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/10/18 01:35:37 UTC

svn commit: r1632696 [1/2] - in /qpid/dispatch/trunk: doc/api/ doc/man/ include/qpid/dispatch/ python/qpid_dispatch/ python/qpid_dispatch/management/ python/qpid_dispatch/qpid_dispatch/ python/qpid_dispatch_internal/ python/qpid_dispatch_internal/manag...

Author: aconway
Date: Fri Oct 17 23:35:35 2014
New Revision: 1632696

URL: http://svn.apache.org/r1632696
Log:
DISPATCH-56: Unifiy C and python management agents.

There are 2 management agents.
Previously the C agent was at the default address $management, python agent at $management2
This commit completes all missing features of the python agent and makes it the default on $management.
The old C agent is still available on $cmanagement, but it is deprecated and will be removed.

- Added C entity types to the python schema: router.link, router.address, router.node, connection, allocator
- Update attribute values from C in python agent.
- Implemented get-operations, get-attributes, get-mgmt-nodes
- Added optional file:line in logging.

Added:
    qpid/dispatch/trunk/include/qpid/dispatch/enum.h
    qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h
      - copied, changed from r1632169, qpid/dispatch/trunk/src/static_assert.h
    qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json
      - copied, changed from r1632169, qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json
    qpid/dispatch/trunk/src/aprintf.h
    qpid/dispatch/trunk/src/c_entity.c
    qpid/dispatch/trunk/src/c_entity.h
      - copied, changed from r1632169, qpid/dispatch/trunk/src/static_assert.h
Removed:
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json
    qpid/dispatch/trunk/src/static_assert.h
Modified:
    qpid/dispatch/trunk/doc/api/doxygen-wrapper.py
    qpid/dispatch/trunk/doc/man/CMakeLists.txt
    qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py
    qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h
    qpid/dispatch/trunk/include/qpid/dispatch/error.h
    qpid/dispatch/trunk/include/qpid/dispatch/log.h
    qpid/dispatch/trunk/python/qpid_dispatch/management/client.py
    qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py
    qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py
    qpid/dispatch/trunk/python/qpid_dispatch/site.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
    qpid/dispatch/trunk/src/CMakeLists.txt
    qpid/dispatch/trunk/src/agent.c
    qpid/dispatch/trunk/src/alloc.c
    qpid/dispatch/trunk/src/connection_manager.c
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/dispatch.c
    qpid/dispatch/trunk/src/dispatch_private.h
    qpid/dispatch/trunk/src/entity.c
    qpid/dispatch/trunk/src/entity_private.h
    qpid/dispatch/trunk/src/error.c
    qpid/dispatch/trunk/src/log.c
    qpid/dispatch/trunk/src/message.c
    qpid/dispatch/trunk/src/python_embedded.c
    qpid/dispatch/trunk/src/router_agent.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/router_private.h
    qpid/dispatch/trunk/src/router_pynode.c
    qpid/dispatch/trunk/src/server.c
    qpid/dispatch/trunk/src/server_private.h
    qpid/dispatch/trunk/src/waypoint.c
    qpid/dispatch/trunk/tests/management/client.py
    qpid/dispatch/trunk/tests/management/qdrouter.py
    qpid/dispatch/trunk/tests/management/schema.py
    qpid/dispatch/trunk/tests/run.py.in
    qpid/dispatch/trunk/tests/system_test.py
    qpid/dispatch/trunk/tests/system_tests_management.py
    qpid/dispatch/trunk/tests/system_tests_one_router.py
    qpid/dispatch/trunk/tests/system_tests_qdmanage.py
    qpid/dispatch/trunk/tests/system_tests_two_routers.py
    qpid/dispatch/trunk/tools/qdmanage

Modified: qpid/dispatch/trunk/doc/api/doxygen-wrapper.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/doc/api/doxygen-wrapper.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/doc/api/doxygen-wrapper.py (original)
+++ qpid/dispatch/trunk/doc/api/doxygen-wrapper.py Fri Oct 17 23:35:35 2014
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-import sys, subprocess;
+import sys, subprocess
 
 doxygen, config = sys.argv[1:3]
 outfile = config+".output"

Modified: qpid/dispatch/trunk/doc/man/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/doc/man/CMakeLists.txt?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/doc/man/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/doc/man/CMakeLists.txt Fri Oct 17 23:35:35 2014
@@ -26,7 +26,7 @@ set (QDROUTERD_CONF qdrouterd.conf.5)
 file (GLOB_RECURSE QDROUTERD_CONF_DEPENDS
   ${CMAKE_CURRENT_SOURCE_DIR}/qdrouterd_conf_man.py
   ${CMAKE_SOURCE_DIR}/python/qpid_router_internal/management/*.py
-  ${CMAKE_SOURCE_DIR}/python/qpid_router_internal/management/qdrouterd.json)
+  ${CMAKE_SOURCE_DIR}/python/qpid_router/management/qdrouterd.json)
 
 add_custom_command (OUTPUT ${QDROUTERD_CONF}
     COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_BINARY_DIR}/tests/run.py -s ${CMAKE_CURRENT_SOURCE_DIR}/qdrouterd_conf_man.py ${QDROUTERD_CONF}

Modified: qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py (original)
+++ qpid/dispatch/trunk/doc/man/qdrouterd_conf_man.py Fri Oct 17 23:35:35 2014
@@ -18,7 +18,8 @@
 ##
 
 """
-Generate the qdrouterd.conf man page from the qdrouterd management schema."""
+Generate the qdrouterd.conf man page from the qdrouterd management schema.
+"""
 
 import sys
 from qpid_dispatch_internal.management.qdrouter import QdSchema
@@ -134,10 +135,11 @@ listener {
             f.write('.IP "Included by %s."\n'%(', '.join(used_by)))
 
         f.write(".SH ENTITY SECTIONS\n\n")
-        for name, entity_type in schema.entity_types.iteritems():
-            f.write('.SS "%s"\n'% name)
-            write_attributes(entity_type)
-            f.write('.IP "Includes %s."\n'%(', '.join(entity_type.include)))
+        for entity_type in schema.entity_types.itervalues():
+            if "CREATE" in entity_type.operations and not entity_type.short_name == 'dummy':
+                f.write('.SS "%s"\n'% entity_type.short_name)
+                write_attributes(entity_type)
+                f.write('.IP "Includes %s."\n'%(', '.join(entity_type.include)))
 
 
 if __name__ == '__main__':

Modified: qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/dispatch.h Fri Oct 17 23:35:35 2014
@@ -22,6 +22,8 @@
 #include <qpid/dispatch/error.h>
 
 typedef struct qd_entity_t qd_entity_t;
+typedef struct qd_c_entity_t  qd_c_entity_t;
+typedef struct qd_c_entity_type_t qd_c_entity_type_t;
 
 /**
  * @defgroup dispatch
@@ -46,7 +48,7 @@ qd_dispatch_t *qd_dispatch(const char *p
  *
  * @param dispatch The dispatch handle returned by qd_dispatch
  */
-void qd_dispatch_free(qd_dispatch_t *dispatch);
+void qd_dispatch_free(qd_dispatch_t *qd);
 
 /**
  * Load the configuration file.
@@ -54,7 +56,7 @@ void qd_dispatch_free(qd_dispatch_t *dis
  * @param dispatch The dispatch handle returned by qd_dispatch
  * @param config_path The path to the configuration file.
  */
-qd_error_t qd_dispatch_load_config(qd_dispatch_t *dispatch, const char *config_path);
+qd_error_t qd_dispatch_load_config(qd_dispatch_t *qd, const char *config_path);
 
 /**
  * Configure the AMQP container from a configuration entity.
@@ -62,7 +64,7 @@ qd_error_t qd_dispatch_load_config(qd_di
  * @param dispatch The dispatch handle returned by qd_dispatch
  * @param entity The configuration entity.
  */
-qd_error_t qd_dispatch_configure_container(qd_dispatch_t *dispatch, qd_entity_t *entity);
+qd_error_t qd_dispatch_configure_container(qd_dispatch_t *qd, qd_entity_t *entity);
 
 /**
  * Configure the router node from a configuration entity.
@@ -71,7 +73,7 @@ qd_error_t qd_dispatch_configure_contain
  * @param dispatch The dispatch handle returned by qd_dispatch.
  * @param entity The configuration entity.
  */
-qd_error_t qd_dispatch_configure_router(qd_dispatch_t *dispatch, qd_entity_t *entity);
+qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity);
 
 /**
  * Prepare Dispatch for operation.  This must be called prior to
@@ -79,17 +81,17 @@ qd_error_t qd_dispatch_configure_router(
  *
  * @param dispatch The dispatch handle returned by qd_dispatch
  */
-qd_error_t qd_dispatch_prepare(qd_dispatch_t *dispatch);
+qd_error_t qd_dispatch_prepare(qd_dispatch_t *qd);
 
 /**
  * Configure an address, must be called after qd_dispatch_prepare
  */
-qd_error_t qd_dispatch_configure_address(qd_dispatch_t *dispatch, qd_entity_t *entity);
+qd_error_t qd_dispatch_configure_address(qd_dispatch_t *qd, qd_entity_t *entity);
 
 /**
  * Configure a waypoint, must be called after qd_dispatch_prepare
  */
-qd_error_t qd_dispatch_configure_waypoint(qd_dispatch_t *dispatch, qd_entity_t *entity);
+qd_error_t qd_dispatch_configure_waypoint(qd_dispatch_t *qd, qd_entity_t *entity);
 
 /**
  * \brief Configure the logging module from the
@@ -100,6 +102,15 @@ qd_error_t qd_dispatch_configure_waypoin
  */
 qd_error_t qd_dispatch_configure_logging(qd_dispatch_t *qd);
 
+/** Register a managed entity implementation with the management agent.
+ * NOTE: impl must be unregistered before it is freed.
+ */
+void qd_dispatch_register_entity(qd_dispatch_t *qd, const char *type, void *impl);
+
+/** Unregister a managed entity implementation */
+void qd_dispatch_unregister_entity(qd_dispatch_t *qd, void *impl);
+
+
 /**
  * @}
  */

Added: qpid/dispatch/trunk/include/qpid/dispatch/enum.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/enum.h?rev=1632696&view=auto
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/enum.h (added)
+++ qpid/dispatch/trunk/include/qpid/dispatch/enum.h Fri Oct 17 23:35:35 2014
@@ -0,0 +1,57 @@
+#ifndef ENUM_H
+#define ENUM_H
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/static_assert.h>
+
+/**@file
+ *
+ * Tools for working with enums and enum string names.
+ *
+ * Enums should follow this pattern:
+ *
+ * // .h file
+ * typedef enum {
+ *   FOO_VALUE1,
+ *   FOO_VALUE2,
+ *  } foo_t;
+ * ENUM_DECLARE(foo); // Declares const char *foo_name(foo_t value)
+ *
+ * // .c file
+ * static const char *const *foo_names = { "foo-value1", "foo-value2", ... };
+ * ENUM_DEFINE(foo, FOO_ENUM_COUNT, foo_names); // Defines const char *foo_name(foo_t value)
+ */
+
+/** Declares:
+ * const char *NAME_get_name(NAME_t value);  // get name for value or NULL if value is invalid.
+ */
+#define ENUM_DECLARE(NAME) const char *NAME##_name(NAME##_t value)
+
+/** Defines:
+ * const char *NAME_name(NAME_t value)
+ */
+#define ENUM_DEFINE(NAME, NAME_ARRAY)                            \
+    const char *NAME##_name(NAME##_t value) {                           \
+        static const size_t count = sizeof(NAME_ARRAY)/sizeof(NAME_ARRAY[0]); \
+        return (0 <= value && value < count) ? NAME_ARRAY[value] : 0;   \
+    }                                                                   \
+    extern int NAME##__enum_dummy // So macro use can end with a ';'
+
+#endif

Modified: qpid/dispatch/trunk/include/qpid/dispatch/error.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/error.h?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/error.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/error.h Fri Oct 17 23:35:35 2014
@@ -19,6 +19,8 @@
  * under the License.
  */
 
+#include <qpid/dispatch/enum.h>
+
 /** @file
  * Thread-safe error handling mechansim for dispatch.
  *
@@ -44,9 +46,9 @@ typedef enum {
     QD_ERROR_CONFIG,            ///< Error in configuration
     QD_ERROR_TYPE,              ///< Value of inappropriate type.
     QD_ERROR_VALUE,             ///< Invalid value.
-    QD_ERROR_RUNTIME,           ///< Run-time failure.
-    QD_ERROR_ENUM_COUNT         ///< Not an error, marks the end of the enum.
+    QD_ERROR_RUNTIME            ///< Run-time failure.
 } qd_error_t;
+ENUM_DECLARE(qd_error);
 
 /**
  * Store thread-local error code and message.
@@ -54,7 +56,9 @@ typedef enum {
  *@param fmt printf-stlye format.
  *@return code
  */
-qd_error_t qd_error(qd_error_t code, const char *fmt, ...);
+#define qd_error(code, fmt, ...) qd_error_impl(code, __FILE__, __LINE__, fmt, ##__VA_ARGS__)
+
+qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char *fmt, ...);
 
 /**
  * Clear thread-local error code and message.
@@ -85,7 +89,9 @@ extern const int QD_ERROR_MAX;
  *
  * @return QD_ERROR_PYTHON or QD_ERROR_NONE.
  */
-qd_error_t qd_error_py();
+#define qd_error_py() qd_error_py_impl(__FILE__, __LINE__)
+
+qd_error_t qd_error_py_impl(const char *file, int line);
 
 #define QD_ERROR_RET() do { if (qd_error_code()) return qd_error_code(); } while(0)
 #define QD_ERROR_PY_RET() do { if (qd_error_py()) return qd_error_code(); } while(0)

Modified: qpid/dispatch/trunk/include/qpid/dispatch/log.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/log.h?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/log.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/log.h Fri Oct 17 23:35:35 2014
@@ -47,8 +47,7 @@ void qd_log_impl(qd_log_source_t *source
  * @param c qd_log_level_t log level of message
  * @param f printf style format string ...
  */
-#define qd_log(s, c, f, ...) \
-    do { if (qd_log_enabled(s,c)) qd_log_impl(s, c, __FILE__, __LINE__, f , ##__VA_ARGS__); } while(0)
+#define qd_log(s, c, f, ...) qd_log_impl(s, c, __FILE__, __LINE__, f , ##__VA_ARGS__)
 
 /** Maximum length for a log message */
 int qd_log_max_len();

Copied: qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h (from r1632169, qpid/dispatch/trunk/src/static_assert.h)
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h?p2=qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h&p1=qpid/dispatch/trunk/src/static_assert.h&r1=1632169&r2=1632696&rev=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/static_assert.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/static_assert.h Fri Oct 17 23:35:35 2014
@@ -38,4 +38,7 @@
     extern char STATIC_ASSERTION__##msg[(expr)?1:2]
 #endif /* #ifdef __GNUC__ */
 
+#define STATIC_ASSERT_ARRAY_LEN(array, len) \
+    STATIC_ASSERT(sizeof(array)/sizeof(array[0]) == len, array##_wrong_size);
+
 #endif // STATIC_ASSERT_H

Modified: qpid/dispatch/trunk/python/qpid_dispatch/management/client.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/client.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/management/client.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/client.py Fri Oct 17 23:35:35 2014
@@ -21,89 +21,11 @@
 AMQP management client for Qpid dispatch.
 """
 
-import proton, re, threading
+import proton, threading
+from proton import Url
 from .error import *
 from .entity import Entity as BaseEntity, clean_dict
 
-class Url(object):
-    """Simple AMQP URL parser/constructor"""
-
-    URL_RE = re.compile(r"""
-    # [   <scheme>://  ] [   <user>   [    : <password> ]  @]      ( <host4>   | \[    <host6>    \] ) [   :<port>   ]   [ / path]
-    ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: : ([^:/@]+)  )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))? (?: / (.*))? $
-""", re.X | re.I)
-    AMQPS = "amqps"
-    AMQP = "amqp"
-
-    def __init__(self, url=None, **kwargs):
-        """
-        @param url: String or Url instance to parse or copy.
-        @param kwargs: URL fields: scheme, user, password, host, port, path.
-            If specified, replaces corresponding component in url.
-        """
-
-        fields = ['scheme', 'user', 'password', 'host', 'port', 'path']
-
-        for f in fields: setattr(self, f, None)
-        for k in kwargs: getattr(self, k) # Check for invalid kwargs
-
-        if isinstance(url, Url): # Copy from another Url instance.
-            self.__dict__.update(url.__dict__)
-
-        elif url is not None:   # Parse from url
-            match = Url.URL_RE.match(url)
-            if match is None:
-                raise ValueError("Invalid AMQP URL: %s"%url)
-            self.scheme, self.user, self.password, host4, host6, port, self.path = match.groups()
-            self.host = host4 or host6
-            self.port = port and int(port)
-
-        # Let kwargs override values previously set from url
-        for field in fields:
-            setattr(self, field, kwargs.get(field, getattr(self, field)))
-
-    def __repr__(self):
-        return "Url(%r)" % str(self)
-
-    def __str__(self):
-        s = ""
-        if self.scheme:
-            s += "%s://" % self.scheme
-        if self.user:
-            s += self.user
-        if self.password:
-            s += ":%s@" % self.password
-        if self.host and ':' in self.host:
-            s += "[%s]" % self.host
-        else:
-            s += self.host or '0.0.0.0'
-        if self.port:
-            s += ":%s" % self.port
-        if self.path:
-            s += "/%s" % self.path
-        return s
-
-    def __eq__(self, url):
-        if isinstance(url, basestring):
-            url = Url(url)
-        return \
-            self.scheme == url.scheme and \
-            self.user == url.user and self.password == url.password and \
-            self.host == url.host and self.port == url.port and \
-            self.path == url.path
-
-    def __ne__(self, url):
-        return not self.__eq__(url)
-
-    def defaults(self):
-        """"Fill in defaults for scheme and port if missing """
-        if not self.scheme: self.scheme = self.AMQP
-        if not self.port:
-            if self.scheme == self.AMQP: self.port = 5672
-            elif self.scheme == self.AMQPS: self.port = 5671
-            else: raise ValueError("Invalid URL scheme: %s"%self.scheme)
-        return self
-
 class MessengerImpl(object):
     """
     Messaging implementation for L{Node} based on proton.Messenger
@@ -244,10 +166,6 @@ class Node(object):
                     response.properties.get('statusDescription')))
             else:
                 raise ManagementError.create(code, response.properties.get('statusDescription'))
-        if response.correlation_id != request.correlation_id:
-            raise NotFoundStatus("Bad correlation id request=%s, response=%s" %
-                               (request.correlation_id, response.correlation_id))
-
 
     def request(self, body=None, **properties):
         """
@@ -279,7 +197,10 @@ class Node(object):
         if not request.reply_to:
             raise ValueError("Message must have reply_to %s", request)
         self.message_impl.send(request)
-        response = self.message_impl.fetch()
+        while True:
+            response = self.message_impl.fetch()
+            # Ignore mismatched correlation IDs, responses to earlier requests that timed out.
+            if response.correlation_id == request.correlation_id: break
         self.check_response(response, request, expect=expect)
         return response
 

Modified: qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/entity.py Fri Oct 17 23:35:35 2014
@@ -77,3 +77,10 @@ class Entity(object):
         self.__delitem__(name)
 
     def __repr__(self): return "Entity(%r)" % self.attributes
+
+def update(entity, values):
+    """Update entity from values
+    @param entity: an Entity
+    @param values: a map of values
+    """
+    for k, v in values.iteritems(): entity[k] = v

Copied: qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json (from r1632169, qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json)
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json?p2=qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json&p1=qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json&r1=1632169&r2=1632696&rev=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.json (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json Fri Oct 17 23:35:35 2014
@@ -85,7 +85,7 @@
   },
   "entity_types": {
     "container": {
-      "allows": "CR",
+      "operations": ["CREATE", "READ"],
       "singleton": true,
       "include": [
         "common"
@@ -104,7 +104,7 @@
     },
 
     "router": {
-      "allows": "CR",
+      "operations": ["CREATE", "READ"],
       "singleton": true,
       "include": [
         "common"
@@ -151,13 +151,16 @@
           "type": "Integer",
           "default": 60,
           "description": ""
-        }
+        },
+	"addrCount": {"type": "Integer"},
+	"linkCount": {"type": "Integer"},
+	"nodeCount": {"type": "Integer"}
       }
     },
 
     "listener": {
       "description": "Listens for incoming connections to the router",
-      "allows": "CR",
+      "operations": ["CREATE", "READ"],
       "include": [
         "common",
         "ssl-profile",
@@ -193,7 +196,7 @@
     },
     "connector": {
       "description": "Establishes an outgoing connections from the router",
-      "allows": "CR",
+      "operations": ["CREATE", "READ"],
       "include": [
         "common",
         "ssl-profile",
@@ -219,19 +222,20 @@
       "include": [
         "common"
       ],
-      "allows": "CR",
+      "operations": ["CREATE", "READ"],
       "attributes": {
         "module": {
           "type":[
             "ROUTER",
             "MESSAGE",
             "SERVER",
-            "AGENT",
-	    "PYAGENT",
+            "CAGENT",
+	    "AGENT",
             "CONTAINER",
             "CONFIG",
             "DEFAULT",
-            "ERROR"
+            "ERROR",
+	    "DISPATCH"
           ],
           "required": true,
           "description": "Module to configure logging level. The special module 'DEFAULT' specifies logging for modules that don't have explicit log sections."
@@ -253,7 +257,12 @@
         "timestamp": {
           "type": "Boolean",
           "default": true,
-          "description": "Set a timestamp on log messages"
+          "description": "Include timestamp in log messages"
+        },
+        "source": {
+          "type": "Boolean",
+          "default": false,
+          "description": "Include source file and line number in log messages"
         },
         "output": {
           "type": "String",
@@ -263,7 +272,7 @@
     },
 
     "fixed-address": {
-      "allows": "CR",
+      "operations": ["CREATE", "READ"],
       "include": [
         "common"
       ],
@@ -297,7 +306,7 @@
     },
 
     "waypoint": {
-      "allows": "CR",
+      "operations": ["CREATE", "READ"],
       "include": [
         "common"
       ],
@@ -323,7 +332,7 @@
 
     "dummy": {
       "description": "Dummy entity for test purposes.",
-      "allows": "CRUD",
+      "operations": ["CREATE", "READ", "UPDATE", "DELETE", "CALLME"],
       "include": ["common"],
       "attributes": {
         "arg1": {"type": "String"},
@@ -331,6 +340,84 @@
 	"num1": {"type": "Integer"},
 	"num2": {"type": "Integer"}
       }
+    },
+
+    "router.link": {
+      "description": "Link to another AMQP endpoint: router node, client or other AMQP process.",
+      "operations": ["READ"],
+      "include": ["common"],
+      "attributes": {
+	"linkType": {"type": ["endpoint", "waypoint", "inter-router", "inter-area"]},
+	"linkDir": {"type": ["in", "out"]},
+	"owningAddr": {"type": "String"},
+	"eventFifoDepth": {"type": "Integer"},
+	"msgFifoDepth": {"type": "Integer"}
+      }
+    },
+
+    "router.address": {
+      "description": "AMQP address managed by the router",
+      "operations": ["READ"],
+      "include": ["common"],
+      "attributes": {
+	"inProcess": {"type": "Boolean"},
+	"subscriberCount": {"type": "Integer"},
+	"remoteCount": {"type": "Integer"},
+	"deliveriesIngress": {"type": "Integer"},
+	"deliveriesEgress": {"type": "Integer"},
+	"deliveriesTransit": {"type": "Integer"},
+	"deliveriesToContainer": {"type": "Integer"},
+	"deliveriesFromContainer": {"type": "Integer"}
+      }
+    },
+
+    "router.node": {
+      "description": "AMQP node managed by the router",
+      "operations": ["READ"],
+      "include": ["common"],
+      "attributes": {
+	"addr": {"type": "String"},
+	"nextHop": {"type": "Integer"},
+	"routerLink": {"type": "Integer"},
+	"validOrigins": {"type": "List"}
+      }
+    },
+
+    "connection": {
+      "description": "Connections to the router's container",
+      "operations": ["READ"],
+      "include": ["common"],
+      "attributes": {
+	"container": {"type": "String"} ,
+	"state": {"type": [
+	  "connecting",
+	  "opening",
+	  "operational",
+	  "failed",
+	  "user"
+	]},
+	"host": {"type": "String"},
+	"dir": {"type": ["in", "out"]},
+	"role": {"type": "String"},
+	"sasl": {"type": "String"}
+      }
+    },
+
+    "allocator": {
+      "description": "Memory allocation pool.",
+      "operations": ["READ"],
+      "include": ["common"],
+      "attributes": {
+	"type_size": {"type": "Integer"},
+	"transfer_batch_size": {"type": "Integer"},
+	"local_free_list_max": {"type": "Integer"},
+	"global_free_list_max": {"type": "Integer"},
+	"total_alloc_from_heap": {"type": "Integer"},
+	"total_free_to_heap": {"type": "Integer"},
+	"held_by_threads": {"type": "Integer"},
+	"batches_rebalanced_to_threads": {"type": "Integer"},
+	"batches_rebalanced_to_global": {"type": "Integer"}
+       }
     }
   }
 }

Modified: qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/qpid_dispatch/site_data.py Fri Oct 17 23:35:35 2014
@@ -25,7 +25,5 @@ environment variables. This file will no
 site.py will use env variables instead.
 """
 
-from os.path import join
-
 QPID_DISPATCH_HOME = ""
 QPID_DISPATCH_LIB = ""

Modified: qpid/dispatch/trunk/python/qpid_dispatch/site.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/site.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/site.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/site.py Fri Oct 17 23:35:35 2014
@@ -27,13 +27,10 @@ def get_variable(name):
     """Get variable value  by first checking os.environ, then site_data"""
     value = os.environ.get(name)
     if value: return value
-    try:
-        site_data = __import__('qpid_dispatch.site_data', globals(), locals(), [name])
-        return getattr(site_data, name)
-    except ImportError, e:
-        raise ImportError("%s: Set %s environment variable." % (e, env))
+    site_data = __import__('qpid_dispatch.site_data', globals(), locals(), [name])
+    return getattr(site_data, name)
 
-for var in ['QPID_DISPATCH_HOME', 'QPID_DISPATCH_LIB']:
-    globals()[var] = get_variable(var)
+QPID_DISPATCH_HOME = get_variable('QPID_DISPATCH_HOME')
+QPID_DISPATCH_LIB = get_variable('QPID_DISPATCH_LIB')
 
 sys.path.insert(0, os.path.join(QPID_DISPATCH_HOME, 'python'))

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/dispatch_c.py Fri Oct 17 23:35:35 2014
@@ -19,7 +19,8 @@
 
 """Access to functions in libqpid-dispatch.so"""
 
-import ctypes, os
+import ctypes
+from contextlib import contextmanager
 from ctypes import c_char_p, c_long, py_object
 from qpid_dispatch.site import QPID_DISPATCH_LIB
 
@@ -47,7 +48,6 @@ class QdDll(ctypes.PyDLL):
         super(QdDll, self).__init__(lib)
 
         # Types
-        # TODO aconway 2014-06-27: can we use typed pointers instead of void*?
         self.qd_dispatch_p = ctypes.c_void_p
 
         # No check on qd_error_* functions, it would be recursive
@@ -63,9 +63,23 @@ class QdDll(ctypes.PyDLL):
         self._prototype(self.qd_dispatch_configure_address, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_waypoint, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_set_agent, None, [self.qd_dispatch_p, py_object])
+
         self._prototype(self.qd_router_setup_late, None, [self.qd_dispatch_p])
+
+        self._prototype(self.qd_dispatch_router_lock, None, [self.qd_dispatch_p])
+        self._prototype(self.qd_dispatch_router_unlock, None, [self.qd_dispatch_p])
+
         self._prototype(self.qd_connection_manager_start, None, [self.qd_dispatch_p])
         self._prototype(self.qd_waypoint_activate_all, None, [self.qd_dispatch_p])
+        self._prototype(self.qd_c_entity_flush, c_long, [py_object])
+
+    @contextmanager
+    def scoped_dispatch_router_lock(self, dispatch):
+        self.qd_dispatch_router_lock(dispatch)
+        try:
+            yield
+        finally:
+            self.qd_dispatch_router_unlock(dispatch)
 
     def _errcheck(self, result, func, args):
         if self.qd_error_code():
@@ -76,6 +90,10 @@ class QdDll(ctypes.PyDLL):
         f.restype = restype
         f.argtypes = argtypes
         if check: f.errcheck = self._errcheck
+        return f
+
+    def function(self, fname, restype, argtypes, check=True):
+        return self._prototype(getattr(self, fname), restype, argtypes, check)
 
 def instance():
     return QdDll.instance()

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/agent.py Fri Oct 17 23:35:35 2014
@@ -23,15 +23,41 @@ Python agent for dispatch router.
 Implements the server side of the AMQP management protocol for the dispatch router.
 Manages a set of manageable Entities that can be Created, Read, Updated and Deleted.
 Entity types are described by the schema in qdrouter.json.
+
+HOW IT WORKS:
+
+There are 3 types of entity:
+1 .Entities created via this agent, e.g. configuration entities.
+   Attributes are pushed into C code and cached by the agent.
+2. Entities created in C code and registered with the agent.
+   Attributes are pulled from C code before handling an operation.
+3. Combined: created as 1. and later registered via 2.
+   Attributes pulled from C overide the initial attributes.
+
+THREAD SAFETY:
+
+The agent is always driven by requests arriving in connection threads.
+Handling requests is serialized.
+
+Adding/removing/updating entities from C:
+- C code registers add/remove of C entities in a thread safe C cache.
+- Before handling an operation, the agent:
+  1. locks the router
+  2. flushes the add/remove cache and adds/removes entities in python cache.
+  3. updates *all* known entites with a C implementation.
+  4. unlocks the router.
 """
 
+import re
+from itertools import ifilter, chain
 from traceback import format_exc
 from threading import Lock
+from ctypes import c_void_p, py_object, c_long
 from dispatch import IoAdapter, LogAdapter, LOG_DEBUG, LOG_ERROR
 from qpid_dispatch.management.error import ManagementError, OK, CREATED, NO_CONTENT, STATUS_TEXT, \
     BadRequestStatus, InternalServerErrorStatus, NotImplementedStatus, NotFoundStatus
 from .. import dispatch_c
-from .schema import ValidationError, Entity as SchemaEntity
+from .schema import ValidationError, Entity as SchemaEntity, EntityType
 from .qdrouter import QdSchema
 from ..router.message import Message
 
@@ -65,19 +91,30 @@ class Entity(SchemaEntity):
     Base class for agent entities with operations as well as attributes.
     """
 
-    def __init__(self, agent, entity_type, attributes):
+    def _update(self): return False # Replaced by _set_pointer
+
+    def __init__(self, agent, entity_type, attributes=None, validate=True):
         """
-        @para qd: Dispatch C library.
+        @para agent: Containing L{Agent}
         @param dispatch: Pointer to qd_dispatch C object.
         @param entity_type: L{EntityType}
-        @param attribute: Attribute name:value map
+        @param attributes: Attribute name:value map
+        @param pointer: Pointer to C object that can be used to update attributes.
         """
-        super(Entity, self).__init__(entity_type, attributes)
+        super(Entity, self).__init__(entity_type, attributes, validate=validate)
         # Direct __dict__ access to avoid validation as schema attributes
         self.__dict__['_agent'] = agent
         self.__dict__['_qd'] = agent.qd
         self.__dict__['_dispatch'] = agent.dispatch
 
+    def _set_pointer(self, pointer):
+        fname = "qd_c_entity_update_" + self.entity_type.short_name.replace('.', '_')
+        updatefn = self._qd.function(
+            fname, c_long, [py_object, c_void_p])
+        def _do_update():
+            updatefn(self.attributes, pointer);
+            return True
+        self.__dict__['_update'] = _do_update
 
     def create(self, request):
         """Subclasses can add extra create actions here"""
@@ -92,26 +129,26 @@ class Entity(SchemaEntity):
         return (OK, self.attributes)
 
     def update(self, request):
+        """Handle update request with new attributes from management client"""
         newattrs = dict(self.attributes, **request.body)
         self.entity_type.validate(newattrs)
         self.attributes = newattrs
         return (OK, self.attributes)
 
     def delete(self, request):
-        self._agent.delete(self)
+        """Handle delete request from client"""
+        self._agent.remove(self)
         return (NO_CONTENT, {})
 
 
-class ContainerEntity(Entity):
-    def create(self, request):
-        self._qd.qd_dispatch_configure_container(self._dispatch, request.body)
+class ContainerEntity(Entity): pass
 
 
 class RouterEntity(Entity):
-    def create(self, request):
-        self._qd.qd_dispatch_configure_router(self._dispatch, self)
-        self._qd.qd_dispatch_prepare(self._dispatch)
-
+    def __init__(self, *args, **kwargs):
+        kwargs['validate'] = False
+        super(RouterEntity, self).__init__(*args, **kwargs)
+        self._set_pointer(self._dispatch)
 
 class LogEntity(Entity):
     def create(self, request):
@@ -146,7 +183,6 @@ class DummyEntity(Entity):
 
     def create(self, request):
         self['identity'] = self.next_id()
-        return (OK, self.attributes)
 
     def next_id(self): return self.type+str(self.id_count.next())
 
@@ -154,41 +190,187 @@ class DummyEntity(Entity):
         return (OK, dict(**request.properties))
 
 
+class CEntity(Entity):
+    """
+    Entity that is registered from C code rather than created via management.
+    """
+    def __init__(self, agent, entity_type, pointer):
+        super(CEntity, self).__init__(agent, entity_type, validate=False)
+        self._set_pointer(pointer)
+        self._update()
+        if not 'identity' in self.attributes:
+            self.attributes['identity'] = "%s:%s" % (entity_type.short_name, self.id_count.next())
+        self.attributes['identity'] = str(self.attributes['identity'])
+        self.validate()
+
+
+class RouterLinkEntity(CEntity):
+    id_count = AtomicCount()
+
+
+class RouterNodeEntity(CEntity):
+    id_count = AtomicCount()
+
+
+class RouterAddressEntity(CEntity):
+    id_count = AtomicCount()
+
+
+class ConnectionEntity(CEntity):
+    id_count = AtomicCount()
+
+
+class AllocatorEntity(CEntity):
+    id_count = AtomicCount()
+
+
+class EntityCache(object):
+    """
+    Searchable cache of entities, can be updated from C attributes.
+    """
+    def __init__(self, agent):
+        self.entities = []
+        self.pointers = {}
+        self.agent = agent
+        self.qd = self.agent.qd
+        self.schema = agent.schema
+
+    def log(self, *args): self.agent.log(*args)
+
+    def map_filter(self, function, test):
+        """Filter with test then apply function."""
+        return map(function, ifilter(test, self.entities))
+
+    def map_type(self, function, type):
+        """Apply function to all entities of type, if type is None do all entities"""
+        if type is None:
+            return map(function, self.entities)
+        else:
+            if isinstance(type, EntityType): type = type.name
+            else: type = self.schema.long_name(type)
+            return map(function, ifilter(lambda e: e.entity_type.name == type, self.entities))
+
+    def add(self, entity, pointer=None):
+        """Add an entity. Provide pointer if it is associated with a C entity"""
+        self.log(LOG_DEBUG, "Add %s entity: %s" %
+                 (entity.entity_type.short_name, entity.attributes['identity']))
+        # Validate in the context of the existing entities for uniqueness
+        self.schema.validate_all(chain(iter([entity]), iter(self.entities)))
+        self.entities.append(entity)
+        if pointer: self.pointers[pointer] = entity
+
+    def _remove(self, entity):
+        try:
+            self.entities.remove(entity);
+            self.log(LOG_DEBUG, "Remove %s entity: %s" %
+                     (entity.entity_type.short_name, entity.attributes['identity']))
+        except ValueError: pass
+
+    def remove(self, entity):
+        self._remove(entity)
+
+    def remove_pointer(self, pointer):
+        self._remove_pointer()
+
+    def _remove_pointer(self, pointer):
+        if pointer in self.pointers:
+            entity = self.pointers[pointer]
+            del self.pointers[pointer]
+            self._remove(entity)
+
+    def update_from_c(self):
+        """Update entities from the C dispatch runtime"""
+        events = []
+        REMOVE, ADD, REMOVE_ADD = 0, 1, 2
+
+        class Action(object):
+            """Collapse a sequence of add/remove actions down to None, remove, add or remove_add"""
+
+            MATRIX = {          # Collaps pairs of actions
+                (None, ADD): ADD,
+                (None, REMOVE): REMOVE,
+                (REMOVE, ADD): REMOVE_ADD,
+                (ADD, REMOVE): None,
+                (REMOVE_ADD, REMOVE): REMOVE
+            }
+
+            def __init__(self, type):
+                self.action = None
+                self.type = type
+
+            def add(self, action):
+                try: self.action = self.MATRIX[(self.action, action)]
+                except KeyError: pass
+
+
+        with self.qd.scoped_dispatch_router_lock(self.agent.dispatch):
+            self.qd.qd_c_entity_flush(events)
+            # Collapse sequences of add/remove into a single remove/add/remove_add per pointer.
+            actions = {}
+            for action, type, pointer in events:
+                if not pointer in actions: actions[pointer] = Action(type)
+                actions[pointer].add(action)
+            for pointer, action in actions.iteritems():
+                if action.action == REMOVE or action.action == REMOVE_ADD:
+                    self._remove_pointer(pointer)
+                if action.action == ADD or action.action == REMOVE_ADD:
+                    entity_type = self.schema.entity_type(action.type)
+                    klass = self.agent.entity_class(entity_type)
+                    entity = klass(self.agent, entity_type, pointer)
+                    self.add(entity, pointer)
+
+            for e in self.entities: e._update()
+
+
 class Agent(object):
     """AMQP managment agent"""
 
     def __init__(self, dispatch, attribute_maps=None):
         self.qd = dispatch_c.instance()
         self.dispatch = dispatch
-        # FIXME aconway 2014-06-26: merge with $management
-        self.io = [IoAdapter(self.receive, "$management2"),
-                   IoAdapter(self.receive, "$management2", True)] # Global
-        self.log = LogAdapter("PYAGENT").log                      # FIXME aconway 2014-09-08: AGENT
         self.schema = QdSchema()
-        self.entities = [self.create_entity(attributes) for attributes in attribute_maps or []]
+        self.entities = EntityCache(self)
         self.name = self.identity = 'self'
         self.type = 'org.amqp.management' # AMQP management node type
+        for attributes in attribute_maps or []:
+            self.add_entity(self.create_entity(attributes))
+        self.request_lock = Lock()
+
+    def log(self, *args): pass         # Replaced in activate.
+
+    SEP_RE = re.compile(r'-|\.')
+
+    def activate(self, address):
+        """Register the management address to receive management requests"""
+        self.io = [IoAdapter(self.receive, address),
+                   IoAdapter(self.receive, address, True)] # Global
+        self.log = LogAdapter("AGENT").log
+
+    def entity_class(self, entity_type):
+        """Return the class that implements entity_type"""
+        class_name = ''.join([n.capitalize() for n in re.split(self.SEP_RE, entity_type.short_name)])
+        class_name += 'Entity'
+        entity_class = globals().get(class_name)
+        if not entity_class:
+            raise InternalServerErrorStatus("Can't find implementation for %s" % entity_type)
+        return entity_class
 
     def create_entity(self, attributes):
         """Create an instance of the implementation class for an entity"""
         if 'type' not in attributes:
             raise BadRequestStatus("No 'type' attribute in %s" % attributes)
         entity_type = self.schema.entity_type(attributes['type'])
-        class_name = ''.join([n.capitalize() for n in entity_type.short_name.split('-')])
-        class_name += 'Entity'
-        entity_class = globals().get(class_name)
-        if not entity_class:
-            raise InternalServerErrorStatus("Can't find implementation for %s" % entity_type)
-        return entity_class(self, entity_type, attributes)
+        return self.entity_class(entity_type)(self, entity_type, attributes)
 
     def respond(self, request, status=OK, description=None, body=None):
         """Send a response to the client"""
+        if body is None: body = {}
         description = description or STATUS_TEXT[status]
         response = Message(
             address=request.reply_to,
             correlation_id=request.correlation_id,
             properties={'statusCode': status, 'statusDescription': description},
-            body=body or {})
+            body=body)
         self.log(LOG_DEBUG, "Agent response:\n  %s\n  Responding to: \n  %s"%(response, request))
         try:
             self.io[0].send(response)
@@ -197,20 +379,27 @@ class Agent(object):
 
     def receive(self, request, link_id):
         """Called when a management request is received."""
-        self.log(LOG_DEBUG, "Agent request %s on link %s"%(request, link_id))
-        def error(e, trace):
-            """Raise an error"""
-            self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, trace))
-            self.respond(request, e.status, e.description)
-        try:
-            status, body = self.handle(request)
-            self.respond(request, status=status, body=body)
-        except ManagementError, e:
-            error(e, format_exc())
-        except ValidationError, e:
-            error(BadRequestStatus(str(e)), format_exc())
-        except Exception, e:
-            error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, e)), format_exc())
+        # Coarse locking, handle one request at a time.
+        with self.request_lock:
+            self.entities.update_from_c()
+            self.log(LOG_DEBUG, "Agent request %s on link %s"%(request, link_id))
+            def error(e, trace):
+                """Raise an error"""
+                self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, trace))
+                self.respond(request, e.status, e.description)
+            try:
+                status, body = self.handle(request)
+                self.respond(request, status=status, body=body)
+            except ManagementError, e:
+                error(e, format_exc())
+            except ValidationError, e:
+                error(BadRequestStatus(str(e)), format_exc())
+            except Exception, e:
+                error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, e)), format_exc())
+
+    def entity_type(self, type):
+        try: return self.schema.entity_type(type)
+        except ValidationError, e: raise NotFoundStatus(str(e))
 
     def handle(self, request):
         """
@@ -219,39 +408,41 @@ class Agent(object):
         @return: (response-code, body)
         """
         operation = required_property('operation', request)
-        type = request.properties.get('type')
+        type = request.properties.get('type') # Allow absent type for requests with a name.
         if type == self.type or operation.lower() == 'create':
+            # Create requests are entity requests but must be handled by the agent since
+            # the entity does not yet exist.
             target = self
         else:
             target = self.find_entity(request)
             target.entity_type.allowed(operation)
         try:
-            method = getattr(target, operation.lower())
+            method = getattr(target, operation.lower().replace("-", "_"))
         except AttributeError:
             not_implemented(operation, target.type)
         return method(request)
 
+    def requested_type(self, request):
+        type = request.properties.get('entityType')
+        if type: return self.entity_type(type)
+        else: return None
+
     def query(self, request):
         """Management node query operation"""
-        entity_type = request.properties.get('entityType')
-        if entity_type:
-            try:
-                entity_type = self.schema.entity_type(entity_type)
-            except:
-                raise NotFoundStatus("Unknown entity type '%s'" % entity_type)
+        type = self.requested_type(request)
         attribute_names = request.body.get('attributeNames')
         if not attribute_names:
-            if entity_type:
-                attribute_names = entity_type.attributes.keys()
+            if type:
+                attribute_names = type.attributes.keys()
             else:               # Every attribute in the schema!
                 names = set()
                 for e in self.schema.entity_types.itervalues():
                     names.update(e.attributes.keys())
                 attribute_names = list(names)
 
-        results = [[e.attributes.get(a) for a in attribute_names]
-                   for e in self.entities
-                   if not entity_type or e.type == entity_type.name]
+        attributes = self.entities.map_type(lambda e: e.attributes, type)
+        results = [[attrs.get(name) for name in attribute_names]
+                   for attrs in attributes]
         return (OK, {'attributeNames': attribute_names, 'results': results})
 
     def create(self, request):
@@ -268,12 +459,39 @@ class Agent(object):
             attributes[a] = value
         entity = self.create_entity(attributes)
         entity.entity_type.allowed('create')
-        # Validate in the context of the existing entities for uniqueness
-        self.schema.validate_all([entity]+self.entities)
         entity.create(request)  # Send the create request to the entity
-        self.entities.append(entity)
+        self.add_entity(entity)
         return (CREATED, entity.attributes)
 
+    def add_entity(self, entity): self.entities.add(entity)
+
+    def remove(self, entity): self.entities.remove(entity)
+
+    def get_types(self, request):
+        type = self.requested_type(request)
+        return (OK, dict((t, []) for t in self.schema.entity_types
+                         if not type or type.name == t))
+
+    def get_operations(self, request):
+        type = self.requested_type(request)
+        return (OK, dict((t, et.operations)
+                         for t, et in self.schema.entity_types.iteritems()
+                         if not type or type.name == t))
+
+    def get_attributes(self, request):
+        type = self.requested_type(request)
+        return (OK, dict((t, [a for a in et.attributes])
+                         for t, et in self.schema.entity_types.iteritems()
+                         if not type or type.name == t))
+
+    def get_mgmt_nodes(self, request):
+        router = self.entities.map_type(None, 'router')[0]
+        area = router.attributes['area']
+        def node_address(node):
+            return "amqp:/_topo/%s/%s/$management" % (area, node.attributes['addr'][1:])
+        return (OK, self.entities.map_type(node_address, 'router.node'))
+
+
     def find_entity(self, request):
         """Find the entity addressed by request"""
 
@@ -287,7 +505,7 @@ class Agent(object):
             return " ".join(["%s=%r" % (k, v) for k, v in ids.iteritems()])
 
         k, v = ids.iteritems().next() # Get the first id attribute
-        found = [e for e in self.entities if e.attributes.get(k) == v]
+        found = self.entities.map_filter(None, lambda e: e.attributes.get(k) == v)
         if len(found) == 1:
             entity = found[0]
         elif len(found) > 1:
@@ -299,13 +517,12 @@ class Agent(object):
         for k, v in ids.iteritems():
             if entity[k] != v: raise BadRequestStatus("Conflicting %s" % attrvals())
 
+        request_type = request.properties.get('type')
+        if request_type and not entity.entity_type.name_is(request_type):
+            raise NotFoundStatus("Entity type '%s' does match requested type '%s'" %
+                           (entity.entity_type.name, request_type))
+
         return entity
 
     def find_entity_by_type(self, type):
-        return [e for e in self.entities if e.entity_type.name == type]
-
-    def delete(self, entity):
-        try:
-            self.entities.remove(entity)
-        except ValueError:
-            raise NotFoundStatus("Cannot delete, entity not found: %s"%entity)
+        return self.entities.map_type(None, type)

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/config.py Fri Oct 17 23:35:35 2014
@@ -98,7 +98,7 @@ class Config(object):
             elif 'identity' in attrs:
                 attrs['name'] = attrs['identity']
             else:
-                identity = "%s%d"%(section_name, count)
+                identity = "%s:%d"%(section_name, count)
                 attrs['name'] = attrs['identity'] = identity
         return content
 
@@ -137,6 +137,12 @@ def configure_dispatch(dispatch, filenam
     qd = dispatch_c.instance()
     dispatch = qd.qd_dispatch_p(dispatch)
     config = Config(filename)
+
+    # NOTE: Can't import agent till till dispatch C extension module is initialized.
+    from .agent import Agent
+    agent = Agent(dispatch, config.entities)
+    qd.qd_dispatch_set_agent(dispatch, agent)
+
     # Configure any DEFAULT log entities first so we can report errors in non-
     # default log configurations to the correct place.
     for l in config.by_type('log'):
@@ -147,10 +153,8 @@ def configure_dispatch(dispatch, filenam
     qd.qd_dispatch_configure_router(dispatch, config.by_type('router').next())
     qd.qd_dispatch_prepare(dispatch)
 
-    # NOTE: Can't import agent till after qd_dispatch_prepare
-    from .agent import Agent
-    qd.qd_dispatch_set_agent(dispatch, Agent(dispatch, config.entities))
-    qd.qd_router_setup_late(dispatch); # Must come after qd_dispatch_set_agent
+    agent.activate("$management")
+    qd.qd_router_setup_late(dispatch);
 
     # Note must configure addresses, waypoints, listeners and connectors after qd_dispatch_prepare
     for a in config.by_type('fixed-address'): qd.qd_dispatch_configure_address(dispatch, a)

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/qdrouter.py Fri Oct 17 23:35:35 2014
@@ -22,6 +22,7 @@ Qpid Dispatch Router management schema a
 """
 
 import json
+from pkgutil import get_data
 from . import schema
 from ..compat import json_load_kwargs
 
@@ -29,12 +30,10 @@ class QdSchema(schema.Schema):
     """
     Qpid Dispatch Router management schema.
     """
-    SCHEMA_FILE = schema.schema_file("qdrouter.json")
-
     def __init__(self):
         """Load schema."""
-        with open(self.SCHEMA_FILE) as f:
-            super(QdSchema, self).__init__(**json.load(f, **json_load_kwargs))
+        schema = get_data('qpid_dispatch.management', 'qdrouter.json')
+        super(QdSchema, self).__init__(**json.loads(schema, **json_load_kwargs))
 
     def validate(self, entities, full=True, **kwargs):
         """

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py Fri Oct 17 23:35:35 2014
@@ -26,7 +26,7 @@ check for uniqueness of enties/attribute
 A Schema can be loaded/dumped to a json file.
 """
 
-import os, sys
+import sys
 from qpid_dispatch.management import entity
 from qpid_dispatch.management.error import ForbiddenStatus
 from ..compat import OrderedDict
@@ -36,11 +36,6 @@ class ValidationError(Exception):
     pass
 
 
-def schema_file(name):
-    """Return a file name relative to the directory from which this module was loaded."""
-    return os.path.join(os.path.dirname(__file__), name)
-
-
 class Type(object):
     """Base class for schema types.
 
@@ -73,6 +68,7 @@ class Type(object):
         """String name of type."""
         return str(self.dump())
 
+
 class BooleanType(Type):
     """A boolean schema type"""
 
@@ -95,6 +91,7 @@ class BooleanType(Type):
         except:
             raise ValidationError("Invalid Boolean value '%r'"%value)
 
+
 class EnumValue(str):
     """A string that convets to an integer value via int()"""
 
@@ -108,6 +105,7 @@ class EnumValue(str):
     def __ne__(self, x): return not self == x
     def __repr__(self): return "EnumValue('%s', %s)"%(str(self), int(self))
 
+
 class EnumType(Type):
     """An enumerated type"""
 
@@ -146,7 +144,8 @@ class EnumType(Type):
         """String description of enum type."""
         return "One of [%s]"%(', '.join(self.tags))
 
-BUILTIN_TYPES = dict((t.name, t) for t in [Type("String", str), Type("Integer", int), BooleanType()])
+BUILTIN_TYPES = dict((t.name, t) for t in
+                     [Type("String", str), Type("Integer", int), Type("List", list), BooleanType()])
 
 def get_type(rep):
     """
@@ -313,7 +312,7 @@ class EntityType(AttributeTypeHolder):
     #ivar include: List of names of sections included by this entity.
     """
     def __init__(self, name, schema, singleton=False, include=None, attributes=None,
-                 description="", allows=""):
+                 description="", operations=None):
         """
         @param name: name of the entity type.
         @param schema: schema for this type.
@@ -321,14 +320,14 @@ class EntityType(AttributeTypeHolder):
         @param include: List of names of include types for this entity.
         @param attributes: Map of attributes {name: {type:, default:, required:, unique:}}
         @param description: Human readable description.
-        @param allows: Allowed operations, string of "CRUD"
+        @param operations: Allowed operations, list of operation names.
         """
         super(EntityType, self).__init__(name, schema, attributes, description)
         self.short_name = schema.short_name(name)
         self.refs = {'entity-type': name}
         self.singleton = singleton
         self.include = include
-        self.allows = allows.upper()
+        self.operations = operations or []
         if include and self.schema.includes:
             for i in include:
                 if not i in schema.includes:
@@ -399,16 +398,18 @@ class EntityType(AttributeTypeHolder):
 
         return attributes
 
-    def allowed(self, operation):
+    def allowed(self, op):
         """Raise excepiton if op is not a valid operation on entity."""
-        op = operation.upper()
-        if op[0] not in self.allows:
+        op = op.upper()
+        if op not in self.operations:
             raise ForbiddenStatus("Operation '%s' not allowed for '%s'" % (op, self.name))
 
     def __repr__(self): return "%s(%s)" % (type(self).__name__, self.name)
 
     def __str__(self): return self.name
 
+    def name_is(self, name):
+        return self.name == self.schema.long_name(name)
 
 class Schema(object):
     """
@@ -448,7 +449,7 @@ class Schema(object):
     def long_name(self, name):
         """Add prefix to unqualified name"""
         if not name: return name
-        if not '.' in  name:
+        if not name.startswith(self.prefixdot):
             name = self.prefixdot + name
         return name
 
@@ -463,12 +464,15 @@ class Schema(object):
                          for e in self.entity_types.itervalues()))
         ])
 
-    def entity_type(self, name):
-        """Look up an EntityType by name"""
+    def entity_type(self, name, error=True):
+        """Look up an EntityType by name.
+        If error raise exception if not found else return None
+        """
         try:
             return self.entity_types[self.long_name(name)]
         except KeyError:
-            raise ValidationError("No such entity_type %r" % name)
+            if error: raise ValidationError("No such entity type '%s'" % name)
+        return None
 
     def validate_entity(self, attributes, check_required=True, add_default=True,
                         check_unique=None, check_singleton=None):
@@ -514,7 +518,6 @@ class Schema(object):
                                  add_default=add_default,
                                  check_unique=check_unique,
                                  check_singleton=check_singleton)
-        return attribute_maps
 
     def entity(self, attributes):
         """Convert an attribute map into an L{Entity}"""
@@ -528,10 +531,11 @@ class Schema(object):
 
 class Entity(entity.Entity):
     """A map of attributes associated with an L{EntityType}"""
-    def __init__(self, entity_type, attributes=None, **kwattrs):
+    def __init__(self, entity_type, attributes=None, validate=True, **kwattrs):
         super(Entity, self).__init__(attributes, **kwattrs)
         self.__dict__['entity_type'] = entity_type
-        self.validate()
+        self.attributes.setdefault('type', entity_type.name)
+        if validate: self.validate()
 
     def __setitem__(self, name, value):
         super(Entity, self).__setitem__(name, value)

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/configuration.py Fri Oct 17 23:35:35 2014
@@ -25,7 +25,7 @@ class Configuration(object):
         ##
         ## Load default values
         ##
-        self.values = 
+        self.values = {}
 
         ##
         ## Apply supplied overrides

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py Fri Oct 17 23:35:35 2014
@@ -83,10 +83,8 @@ class RouterEngine:
     @property
     def config(self):
         if not self._config:
-            router_type = 'org.apache.qpid.dispatch.router'
-            routers = self.router_adapter.get_agent().find_entity_by_type(router_type)
-            if not routers: raise ValueError("No router configuration found") 
-            self._config = routers[0]
+            try: self._config = self.router_adapter.get_agent().find_entity_by_type('router')[0]
+            except IndexError: raise ValueError("No router configuration found")
         return self._config
 
     def addressAdded(self, addr):

Modified: qpid/dispatch/trunk/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/CMakeLists.txt?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/src/CMakeLists.txt Fri Oct 17 23:35:35 2014
@@ -26,7 +26,7 @@ set(GENERATED_SOURCES
 file (GLOB_RECURSE GENERATOR_DEPENDS
   ${CMAKE_CURRENT_SOURCE_DIR}/schema_c.py
   ${CMAKE_SOURCE_DIR}/python/qpid_router_internal/management/*.py
-  ${CMAKE_SOURCE_DIR}/python/qpid_router_internal/management/qdrouterd.json)
+  ${CMAKE_SOURCE_DIR}/python/qpid_router/management/qdrouterd.json)
 
 add_custom_command (
   OUTPUT ${GENERATED_SOURCES}
@@ -48,6 +48,7 @@ set(qpid_dispatch_SOURCES
   container.c
   dispatch.c
   entity.c
+  c_entity.c
   hash.c
   iovec.c
   iterator.c

Modified: qpid/dispatch/trunk/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/agent.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/agent.c (original)
+++ qpid/dispatch/trunk/src/agent.c Fri Oct 17 23:35:35 2014
@@ -85,7 +85,7 @@ typedef enum {
 // Convenience for logging, expects agent to be defined.
 #define LOG(LEVEL, MSG, ...) qd_log(agent->log_source, QD_LOG_##LEVEL, MSG, ##__VA_ARGS__)
 
-static const char *AGENT_ADDRESS       = "$management";
+static const char *AGENT_ADDRESS       = "$cmanagement";
 static const char *STATUS_CODE         = "statusCode";
 static const char *STATUS_DESCRIPTION  = "statusDescription";
 static const char *AP_ENTITY_TYPE      = "entityType";
@@ -763,7 +763,7 @@ static qd_agent_class_t *qd_agent_regist
 qd_agent_t *qd_agent(qd_dispatch_t *qd)
 {
     qd_agent_t *agent = NEW(qd_agent_t);
-    agent->log_source = qd_log_source("AGENT");
+    agent->log_source = qd_log_source("CAGENT");
     agent->qd         = qd;
     agent->class_hash = qd_hash(6, 10, 1);
     DEQ_INIT(agent->class_list);

Modified: qpid/dispatch/trunk/src/alloc.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/alloc.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/alloc.c (original)
+++ qpid/dispatch/trunk/src/alloc.c Fri Oct 17 23:35:35 2014
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include <Python.h>
 #include <qpid/dispatch/alloc.h>
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/log.h>
@@ -24,6 +25,8 @@
 #include <memory.h>
 #include <inttypes.h>
 #include <stdio.h>
+#include "entity_private.h"
+#include "c_entity.h"
 
 #define QD_MEMORY_DEBUG 1
 
@@ -95,6 +98,7 @@ static void qd_alloc_init(qd_alloc_type_
 
         desc->header  = PATTERN_FRONT;
         desc->trailer = PATTERN_BACK;
+        qd_c_entity_add(QD_ALLOCATOR_TYPE, type_item);
     }
 
     sys_mutex_unlock(init_lock);
@@ -284,6 +288,7 @@ void qd_alloc_finalize(void)
     qd_alloc_item_t *item;
     qd_alloc_type_t *type_item = DEQ_HEAD(type_list);
     while (type_item) {
+        qd_c_entity_remove(QD_ALLOCATOR_TYPE, type_item);
         qd_alloc_type_desc_t *desc = type_item->desc;
 
         //
@@ -342,10 +347,28 @@ void qd_alloc_finalize(void)
 }
 
 
+qd_error_t qd_c_entity_update_allocator(qd_entity_t* entity, void *impl) {
+    qd_alloc_type_t *alloc_type = (qd_alloc_type_t*) impl;
+    if ((qd_entity_has(entity, "identity") ||
+         qd_entity_set_string(entity, "identity", alloc_type->desc->type_name) == 0) &&
+        qd_entity_set_long(entity, "type_size", alloc_type->desc->total_size) == 0 &&
+        qd_entity_set_long(entity, "transfer_batch_size", alloc_type->desc->config->transfer_batch_size) == 0 &&
+        qd_entity_set_long(entity, "local_free_list_max", alloc_type->desc->config->local_free_list_max) == 0 &&
+        qd_entity_set_long(entity, "global_free_list_max", alloc_type->desc->config->global_free_list_max) == 0 &&
+        qd_entity_set_long(entity, "total_alloc_from_heap", alloc_type->desc->stats->total_alloc_from_heap) == 0 &&
+        qd_entity_set_long(entity, "total_free_to_heap", alloc_type->desc->stats->total_free_to_heap) == 0 &&
+        qd_entity_set_long(entity, "held_by_threads", alloc_type->desc->stats->held_by_threads) == 0 &&
+        qd_entity_set_long(entity, "batches_rebalanced_to_threads", alloc_type->desc->stats->batches_rebalanced_to_threads) == 0 &&
+        qd_entity_set_long(entity, "batches_rebalanced_to_global", alloc_type->desc->stats->batches_rebalanced_to_global) == 0)
+        return QD_ERROR_NONE;
+    return qd_error_code();
+}
+
+
 static void alloc_attr_name(void *object_handle, void *cor, void *unused)
 {
-    qd_alloc_type_t *item = (qd_alloc_type_t*) object_handle;
-    qd_agent_value_string(cor, 0, item->desc->type_name);
+    qd_alloc_type_t *alloc_type = (qd_alloc_type_t*) object_handle;
+    qd_agent_value_string(cor, 0, alloc_type->desc->type_name);
 }
 
 
@@ -412,7 +435,6 @@ static void alloc_attr_batches_rebalance
 }
 
 
-static const char *ALLOC_TYPE = "org.apache.qpid.dispatch.allocator";
 static const qd_agent_attribute_t ALLOC_ATTRIBUTES[] =
     {{"name", alloc_attr_name, 0},
      {"identity", alloc_attr_name, 0},
@@ -442,6 +464,14 @@ static void alloc_query_handler(void* co
 
 void qd_alloc_setup_agent(qd_dispatch_t *qd)
 {
-    qd_agent_register_class(qd, ALLOC_TYPE, 0, ALLOC_ATTRIBUTES, alloc_query_handler);
+    qd_agent_register_class(qd, QD_ALLOCATOR_TYPE, 0, ALLOC_ATTRIBUTES, alloc_query_handler);
 }
 
+
+// Entity add/remove event cache.
+
+
+struct event {
+    const char *type;           /* Set for an add event, NULL for a remove event */
+    void *pointer;
+};

Added: qpid/dispatch/trunk/src/aprintf.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/aprintf.h?rev=1632696&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/aprintf.h (added)
+++ qpid/dispatch/trunk/src/aprintf.h Fri Oct 17 23:35:35 2014
@@ -0,0 +1,69 @@
+#ifndef BPRINTF_H
+#define BPRINTF_H
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <errno.h>
+#include <assert.h>
+
+/**
+   Variadic appending printf - see aprintf()
+ */
+static int vaprintf(char **begin, char *end, const char *format, va_list ap_in) {
+    int size = end - *begin;
+    if (size == 0) return EINVAL;
+    va_list ap;
+    va_copy(ap, ap_in);
+    int n = vsnprintf(*begin, size, format, ap);
+    va_end(ap);
+    if (n < 0) return n;
+    if (n >= size) {
+        *begin = end-1;
+        assert(**begin == '\0');
+        return n;
+    }
+    *begin += n;
+    assert(*begin < end);
+    assert(**begin == '\0');
+    return 0;
+}
+
+/**
+   Appending printf.
+
+   Print to buffer at *begin with null terminatr, do not go beyond end.
+   Advance *begin to point to the null terminator.
+.  Return value:
+   - 0 on success: advance *begin to the null terminator.
+   - n > 0: printing was truncated and would have printed n characters. *begin == end-1
+   - n < 0: error (return value of vsnprintf) no change to *begin
+ */
+static int aprintf(char **begin, char *end, const char *format, ...) {
+    va_list ap;
+    va_start(ap, format);
+    int n = vaprintf(begin, end, format, ap);
+    va_end(ap);
+    return n;
+}
+
+
+
+#endif

Added: qpid/dispatch/trunk/src/c_entity.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/c_entity.c?rev=1632696&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/c_entity.c (added)
+++ qpid/dispatch/trunk/src/c_entity.c Fri Oct 17 23:35:35 2014
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/python_embedded.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/ctools.h>
+#include <structmember.h>
+#include "c_entity.h"
+#include "entity_private.h"
+#include "dispatch_private.h"
+#include "router_private.h"
+
+#include <stdbool.h>
+#include <pthread.h>
+
+
+typedef enum { REMOVE=0, ADD=1 }  action_t;
+
+typedef struct entity_event_t {
+    DEQ_LINKS(struct entity_event_t);
+    action_t action;
+    const char *type;
+    void *object;
+} entity_event_t;
+
+DEQ_DECLARE(entity_event_t, entity_event_list_t);
+
+static entity_event_t *entity_event(action_t action, const char *type, void *object) {
+    entity_event_t *event = NEW(entity_event_t);
+    DEQ_ITEM_INIT(event);
+    event->action = action;
+    event->type = type;
+    event->object = object;
+    return event;
+}
+
+static sys_mutex_t *event_lock = 0;
+static entity_event_list_t  event_list;
+
+void qd_c_entity_initialize(void) {
+    event_lock = sys_mutex();
+    DEQ_INIT(event_list);
+}
+
+static void push_event(action_t action, const char *type, void *object) {
+    if (!event_lock) return;    /* Unit tests don't call qd_c_entity_initialize */
+    sys_mutex_lock(event_lock);
+    entity_event_t *event = entity_event(action, type, object);
+    DEQ_INSERT_TAIL(event_list, event);
+    sys_mutex_unlock(event_lock);
+}
+
+void qd_c_entity_add(const char *type, void *object) { push_event(ADD, type, object); }
+
+void qd_c_entity_remove(const char *type, void *object) { push_event(REMOVE, type, object); }
+
+// Flush events in the add/remove cache into a python list of (action, type, pointer)
+qd_error_t qd_c_entity_flush(PyObject *list) {
+    if (!event_lock) return QD_ERROR_NONE;    /* Unit tests don't call qd_c_entity_initialize */
+    qd_error_clear();
+    sys_mutex_lock(event_lock);
+    fprintf(stderr, "qd_c_entity_flush %d\n", (int)DEQ_SIZE(event_list));
+    entity_event_t *event = DEQ_HEAD(event_list);
+    while (event) {
+        PyObject *tuple = Py_BuildValue("(isl)", (int)event->action, event->type, (long)event->object);
+        if (!tuple) { fprintf(stderr, "No tuple"); qd_error_py(); break; }
+        int err = PyList_Append(list, tuple);
+        Py_DECREF(tuple);
+        if (err) { fprintf(stderr, "No tuple"); qd_error_py(); break; }
+        DEQ_REMOVE_HEAD(event_list);
+        free(event);
+        event = DEQ_HEAD(event_list);
+    }
+    sys_mutex_unlock(event_lock);
+    return qd_error_code();
+}
+
+const char *QD_ALLOCATOR_TYPE = "allocator";
+const char *QD_CONNECTION_TYPE = "connection";
+const char *QD_ROUTER_TYPE = "router";
+const char *QD_ROUTER_NODE_TYPE = "router.node";
+const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
+const char *QD_ROUTER_LINK_TYPE = "router.link";

Copied: qpid/dispatch/trunk/src/c_entity.h (from r1632169, qpid/dispatch/trunk/src/static_assert.h)
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/c_entity.h?p2=qpid/dispatch/trunk/src/c_entity.h&p1=qpid/dispatch/trunk/src/static_assert.h&r1=1632169&r2=1632696&rev=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/static_assert.h (original)
+++ qpid/dispatch/trunk/src/c_entity.h Fri Oct 17 23:35:35 2014
@@ -1,6 +1,5 @@
-#ifndef STATIC_ASSERT_H
-#define STATIC_ASSERT_H
-
+#ifndef C_ENTITY_H
+#define C_ENTITY_H 1
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,21 +20,28 @@
  */
 
 /** @file
- * STATIC_ASSERT allows you to do compile time assertions at file scope or in a function.
- * @param expr: a boolean expression that is valid at compile time.
- * @param msg: a "message" that must also be a valid identifier, i.e. message_with_underscores
+ *
+ * Python-C interface for managed entity implementation objects.
+ *
+ * Maintain a cache of entity add and remove events that can be read by the
+ * python agent.
  */
 
+/** Initialize the module. */
+void qd_c_entity_initialize(void);
+
+/** Record an entity add event. */
+void qd_c_entity_add(const char *type, void *object);
+
+/** Record an entity remove event. */
+void qd_c_entity_remove(const char *type, void *object);
 
-#ifdef __GNUC__
-#define STATIC_ASSERT_HELPER(expr, msg) \
-    (!!sizeof(struct { unsigned int STATIC_ASSERTION__##msg: (expr) ? 1 : -1; }))
-#define STATIC_ASSERT(expr, msg) \
-    extern int (*assert_function__(void)) [STATIC_ASSERT_HELPER(expr, msg)]
-#else
-    #define STATIC_ASSERT(expr, msg)   \
-    extern char STATIC_ASSERTION__##msg[1]; \
-    extern char STATIC_ASSERTION__##msg[(expr)?1:2]
-#endif /* #ifdef __GNUC__ */
+/** Entity type strings */
+extern const char *QD_ALLOCATOR_TYPE;
+extern const char *QD_CONNECTION_TYPE;
+extern const char *QD_ROUTER_TYPE;
+extern const char *QD_ROUTER_NODE_TYPE;
+extern const char *QD_ROUTER_ADDRESS_TYPE;
+extern const char *QD_ROUTER_LINK_TYPE;
 
-#endif // STATIC_ASSERT_H
+#endif

Modified: qpid/dispatch/trunk/src/connection_manager.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager.c (original)
+++ qpid/dispatch/trunk/src/connection_manager.c Fri Oct 17 23:35:35 2014
@@ -23,6 +23,7 @@
 #include "dispatch_private.h"
 #include "server_private.h"
 #include "entity_private.h"
+#include "c_entity.h"
 #include "schema_enum.h"
 #include <string.h>
 
@@ -337,7 +338,6 @@ static void cm_attr_dir(void *object_han
 }
 
 
-static const char *CONN_TYPE = "org.apache.qpid.dispatch.connection";
 static const qd_agent_attribute_t CONN_ATTRIBUTES[] =
     {{"name", cm_attr_name, 0},
      {"identity", cm_attr_name, 0},
@@ -349,7 +349,6 @@ static const qd_agent_attribute_t CONN_A
      {"dir", cm_attr_dir, 0},
      {0, 0, 0}};
 
-
 static void server_query_handler(void* context, void *cor)
 {
     qd_dispatch_t *qd        = (qd_dispatch_t*) context;
@@ -365,10 +364,9 @@ static void server_query_handler(void* c
     sys_mutex_unlock(qd_server->lock);
 }
 
-
 void qd_connection_manager_setup_agent(qd_dispatch_t *qd)
 {
-    qd_agent_register_class(qd, CONN_TYPE, qd, CONN_ATTRIBUTES, server_query_handler);
+    qd_agent_register_class(qd, QD_CONNECTION_TYPE, qd, CONN_ATTRIBUTES, server_query_handler);
 }
 
 

Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Fri Oct 17 23:35:35 2014
@@ -440,9 +440,9 @@ qd_container_t *qd_container(qd_dispatch
     DEQ_INIT(container->nodes);
     DEQ_INIT(container->node_type_list);
 
-    qd_log(container->log_source, QD_LOG_TRACE, "Container Initializing");
     qd_server_set_conn_handler(qd, handler, container);
 
+    qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized");
     return container;
 }
 

Modified: qpid/dispatch/trunk/src/dispatch.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch.c (original)
+++ qpid/dispatch/trunk/src/dispatch.c Fri Oct 17 23:35:35 2014
@@ -22,6 +22,8 @@
 #include <qpid/dispatch.h>
 #include <qpid/dispatch/server.h>
 #include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/static_assert.h>
+
 #include "dispatch_private.h"
 #include "alloc_private.h"
 #include "log_private.h"
@@ -29,12 +31,12 @@
 #include "waypoint_private.h"
 #include "message_private.h"
 #include "entity_private.h"
-#include "static_assert.h"
+#include "c_entity.h"
 
 /**
  * Private Function Prototypes
  */
-qd_server_t    *qd_server(int tc, const char *container_name);
+qd_server_t    *qd_server(qd_dispatch_t *qd, int tc, const char *container_name);
 void            qd_connection_manager_setup_agent(qd_dispatch_t *qd);
 void            qd_server_free(qd_server_t *server);
 qd_container_t *qd_container(qd_dispatch_t *qd);
@@ -49,11 +51,10 @@ void            qd_error_initialize();
 
 qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
 {
-    qd_error_clear();
     qd_dispatch_t *qd = NEW(qd_dispatch_t);
     memset(qd, 0, sizeof(qd_dispatch_t));
 
-    // alloc, log and error have to be initialized before any module.
+    qd_c_entity_initialize();   /* Must be first */
     qd_alloc_initialize();
     qd_log_initialize();
     qd_error_initialize();
@@ -67,6 +68,7 @@ qd_dispatch_t *qd_dispatch(const char *p
     if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
     qd_message_initialize();
     if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
+    qd->log_source = qd_log_source("DISPATCH");
     return qd;
 }
 
@@ -76,15 +78,15 @@ STATIC_ASSERT(sizeof(long) >= sizeof(voi
 
 qd_error_t qd_dispatch_load_config(qd_dispatch_t *qd, const char *config_path)
 {
-    PyObject *module=0, *configure_dispatch=0, *result=0;
-    bool ok =
-        (module = PyImport_ImportModule("qpid_dispatch_internal.management.config")) &&
-	(configure_dispatch = PyObject_GetAttrString(module, "configure_dispatch")) &&
-	(result = PyObject_CallFunction(configure_dispatch, "(ls)", (long)qd, config_path));
-    Py_XDECREF(module);
-    Py_XDECREF(configure_dispatch);
-    Py_XDECREF(result);
-    return ok ? QD_ERROR_NONE : qd_error_py();
+    PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.management.config");
+    if (!module) return qd_error_py();
+    PyObject *configure_dispatch = PyObject_GetAttrString(module, "configure_dispatch");
+    Py_DECREF(module);
+    if (!configure_dispatch) return qd_error_py();
+    PyObject *result = PyObject_CallFunction(configure_dispatch, "(ls)", (long)qd, config_path);
+    Py_DECREF(configure_dispatch);
+    if (!result) return qd_error_py();
+    return QD_ERROR_NONE;
 }
 
 
@@ -121,7 +123,7 @@ qd_error_t qd_dispatch_configure_waypoin
 
 qd_error_t qd_dispatch_prepare(qd_dispatch_t *qd)
 {
-    qd->server             = qd_server(qd->thread_count, qd->container_name);
+    qd->server             = qd_server(qd, qd->thread_count, qd->container_name);
     qd->container          = qd_container(qd);
     qd->router             = qd_router(qd, qd->router_mode, qd->router_area, qd->router_id);
     qd->agent              = qd_agent(qd);
@@ -155,3 +157,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
     qd_alloc_finalize();
     qd_python_finalize();
 }
+
+
+void qd_dispatch_router_lock(qd_dispatch_t *qd) { sys_mutex_lock(qd->router->lock); }
+void qd_dispatch_router_unlock(qd_dispatch_t *qd) { sys_mutex_unlock(qd->router->lock); }

Modified: qpid/dispatch/trunk/src/dispatch_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch_private.h?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch_private.h (original)
+++ qpid/dispatch/trunk/src/dispatch_private.h Fri Oct 17 23:35:35 2014
@@ -55,6 +55,8 @@ struct qd_dispatch_t {
     char  *router_area;
     char  *router_id;
     qd_router_mode_t  router_mode;
+
+    qd_log_source_t *log_source;
 };
 
 void qd_dispatch_set_agent(qd_dispatch_t *qd, void *agent);

Modified: qpid/dispatch/trunk/src/entity.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/entity.c?rev=1632696&r1=1632695&r2=1632696&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/entity.c (original)
+++ qpid/dispatch/trunk/src/entity.c Fri Oct 17 23:35:35 2014
@@ -50,12 +50,12 @@ char *qd_entity_string(qd_entity_t *enti
     qd_error_clear();
     PyObject *py_obj = qd_entity_get_py(entity, attribute);
     PyObject *py_str = py_obj ? PyObject_Str(py_obj) : NULL;
-    char* str = py_str ? PyString_AsString(py_str) : NULL;
+    const char *cstr = py_str ? PyString_AsString(py_str) : NULL;
+    char* str = cstr ? strdup(cstr) : NULL;
     Py_XDECREF(py_obj);
     Py_XDECREF(py_str);
-    if (str) return strdup(str);
-    qd_error_py();
-    return NULL;
+    if (!str) qd_error_py();
+    return str;
 }
 
 long qd_entity_long(qd_entity_t *entity, const char* attribute) {
@@ -102,3 +102,83 @@ bool qd_entity_opt_bool(qd_entity_t *ent
     }
     return default_value;
 }
+
+
+/**
+ * Set a value for an entity attribute. If py_value == NULL then clear the attribute.
+ * If the attribute exists and is a list, append this value to the list.
+ *
+ * NOTE: This function will Py_XDECREF(py_value).
+ */
+qd_error_t qd_entity_set_py(qd_entity_t* entity, const char* attribute, PyObject* py_value) {
+    qd_error_clear();
+
+    int result = 0;
+    PyObject *py_key = PyString_FromString(attribute);
+    if (py_key) {
+        if (py_value == NULL) {     /* Delete the attribute */
+            result = PyObject_DelItem((PyObject*)entity, py_key);
+            PyErr_Clear();          /* Ignore error if it isn't there. */
+        }
+        else {
+            PyObject *old = PyObject_GetItem((PyObject*)entity, py_key);
+            PyErr_Clear();          /* Ignore error if it isn't there. */
+            if (old && PyList_Check(old)) /* Add to list */
+                result = PyList_Append(old, py_value);
+            else                    /* Set attribute */
+                result = PyObject_SetItem((PyObject*)entity, py_key, py_value);
+            Py_XDECREF(old);
+        }
+    }
+    Py_XDECREF(py_key);
+    Py_XDECREF(py_value);
+    return (py_key == NULL || result < 0) ? qd_error_py() : QD_ERROR_NONE;
+}
+
+qd_error_t qd_entity_set_string(qd_entity_t *entity, const char* attribute, const char *value) {
+    return qd_entity_set_py(entity, attribute, value ? PyString_FromString(value) : 0);
+}
+
+qd_error_t qd_entity_set_longp(qd_entity_t *entity, const char* attribute, const long *value) {
+    return qd_entity_set_py(entity, attribute, value ? PyInt_FromLong(*value) : 0);
+}
+
+qd_error_t qd_entity_set_boolp(qd_entity_t *entity, const char *attribute, const bool *value) {
+    return qd_entity_set_py(entity, attribute, value ? PyBool_FromLong(*value) : 0);
+}
+
+qd_error_t qd_entity_set_long(qd_entity_t *entity, const char* attribute, long value) {
+    return qd_entity_set_longp(entity, attribute, &value);
+}
+
+qd_error_t qd_entity_set_bool(qd_entity_t *entity, const char *attribute, bool value) {
+    return qd_entity_set_boolp(entity, attribute, &value);
+}
+
+qd_error_t qd_entity_clear(qd_entity_t *entity, const char *attribute) {
+    return qd_entity_set_py(entity, attribute, 0);
+}
+
+#define CHECK(err) if (err) return qd_error_code()
+
+qd_error_t qd_entity_set_list(qd_entity_t *entity, const char *attribute) {
+    CHECK(qd_entity_clear(entity, attribute));
+    return qd_entity_set_py(entity, attribute, PyList_New(0));
+}
+
+qd_error_t qd_entity_set_stringf(qd_entity_t *entity, const char* attribute, const char *format, ...)
+{
+    // Calculate the size
+    char dummy[1];
+    va_list ap;
+    va_start(ap, format);
+    int len = vsnprintf(dummy, 1, format, ap);
+    va_end(ap);
+
+    char buf[len+1];
+    va_start(ap, format);
+    vsnprintf(buf, len+1, format, ap);
+    va_end(ap);
+
+    return qd_entity_set_string(entity, attribute, buf);
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org