You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/11/06 21:42:06 UTC

qpid-dispatch git commit: DISPATCH-179 - Initial coding to introduce a C agent that will handle certain requests and forward the rest to the Python Agent

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 b3b3d79df -> cf319fd21


DISPATCH-179 - Initial coding to introduce a C agent that will handle certain requests and forward the rest to the Python Agent


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/cf319fd2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/cf319fd2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/cf319fd2

Branch: refs/heads/tross-DISPATCH-179-1
Commit: cf319fd215e8d2f4e4b09d5333dbc75fc2a96d22
Parents: b3b3d79
Author: ganeshmurthy <gm...@redhat.com>
Authored: Fri Nov 6 15:28:20 2015 -0500
Committer: ganeshmurthy <gm...@redhat.com>
Committed: Fri Nov 6 15:28:20 2015 -0500

----------------------------------------------------------------------
 .../qpid_dispatch_internal/management/config.py |   3 +-
 src/CMakeLists.txt                              |   1 +
 src/router_core/agent.c                         |   4 +-
 src/router_core/agent_address.c                 |   1 +
 src/router_core/management_agent.c              | 349 +++++++++++++++++++
 src/router_core/management_agent_private.h      |  25 ++
 src/router_node.c                               |   8 +
 src/server.c                                    |   1 +
 8 files changed, 389 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/python/qpid_dispatch_internal/management/config.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py
index 73725c1..0ab9bbb 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -138,6 +138,7 @@ def configure_dispatch(dispatch, lib_handle, filename):
     for l in config.by_type('log'):
         configure(l)
         modules.remove(l["module"])
+
     # Add default entities for any log modules not configured.
     for m in modules: agent.configure(attributes=dict(type="log", module=m))
 
@@ -145,7 +146,7 @@ def configure_dispatch(dispatch, lib_handle, filename):
     configure(config.by_type('container')[0])
     configure(config.by_type('router')[0])
     qd.qd_dispatch_prepare(dispatch)
-    agent.activate("$management")
+    #agent.activate("$management_internal")
     qd.qd_router_setup_late(dispatch) # Actions requiring active management agent.
 
     # Remaining configuration

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5168803..a55b73b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -71,6 +71,7 @@ set(qpid_dispatch_SOURCES
   router_core/router_core.c
   router_core/router_core_thread.c
   router_core/route_tables.c
+  router_core/management_agent.c
   router_delivery.c
   router_node.c
   router_forwarders.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 11f8fc2..9eabf6b 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -102,7 +102,7 @@ qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, qd_router_entity_
     switch (query->entity_type) {
     case QD_ROUTER_CONNECTION: break;
     case QD_ROUTER_LINK:       break;
-    case QD_ROUTER_ADDRESS:    qdra_address_set_columns(query, attribute_names);
+    case QD_ROUTER_ADDRESS:    qdra_address_set_columns(query, attribute_names);break;
     case QD_ROUTER_WAYPOINT:   break;
     case QD_ROUTER_EXCHANGE:   break;
     case QD_ROUTER_BINDING:    break;
@@ -117,7 +117,7 @@ void qdr_query_add_attribute_names(qdr_query_t *query)
     switch (query->entity_type) {
     case QD_ROUTER_CONNECTION: break;
     case QD_ROUTER_LINK:       break;
-    case QD_ROUTER_ADDRESS:    qdra_address_emit_columns(query);
+    case QD_ROUTER_ADDRESS:    qdra_address_emit_columns(query); break;
     case QD_ROUTER_WAYPOINT:   break;
     case QD_ROUTER_EXCHANGE:   break;
     case QD_ROUTER_BINDING:    break;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index 94a0659..88a60f7 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -111,6 +111,7 @@ static void qdr_manage_write_address_CT(qdr_query_t *query, qdr_address_t *addr)
             qd_compose_insert_null(body);
             break;
         }
+        i++;
     }
     qd_compose_end_list(body);
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
new file mode 100644
index 0000000..848ca66
--- /dev/null
+++ b/src/router_core/management_agent.c
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/router.h>
+#include <qpid/dispatch/router_core.h>
+#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/dispatch.h>
+#include "management_agent_private.h"
+#include "dispatch_private.h"
+#include "alloc.h"
+
+const char *entity_type_key = "entityType";
+const char *count_key = "count";
+const char *offset_key = "offset";
+
+const char *operation_type_key = "operation";
+const char *attribute_names_key = "attributeNames";
+
+const unsigned char *address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.address";
+const unsigned char *link_entity_type    = (unsigned char*) "org.apache.qpid.dispatch.router.link";
+
+const char * const status_description = "statusDescription";
+const char * const correlation_id = "correlation-id";
+const char * const results = "results";
+const char * const status_code = "statusCode";
+
+const char * MANAGEMENT_INTERNAL = "$management_internal";
+
+//TODO - Move these to amqp.h
+const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY";
+const unsigned char *MANAGEMENT_CREATE = (unsigned char*) "CREATE";
+const unsigned char *MANAGEMENT_READ = (unsigned char*) "READ";
+const unsigned char *MANAGEMENT_UPDATE = (unsigned char*) "UPDATE";
+const unsigned char *MANAGEMENT_DELETE = (unsigned char*) "DELETE";
+
+
+typedef enum {
+    QD_ROUTER_OPERATION_QUERY,
+    QD_ROUTER_OPERATION_CREATE,
+    QD_ROUTER_OPERATION_READ,
+    QD_ROUTER_OPERATION_UPDATE,
+    QD_ROUTER_OPERATION_DELETE,
+} qd_router_operation_type_t;
+
+
+typedef struct qd_management_context_t {
+    qd_message_t        *msg;
+    qd_composed_field_t *field;
+    qdr_query_t         *query;
+    qd_dispatch_t       *qd;
+    qd_field_iterator_t *to;
+    int                 count;
+    int                 current_count;
+} qd_management_context_t ;
+
+ALLOC_DECLARE(qd_management_context_t);
+ALLOC_DEFINE(qd_management_context_t);
+
+/**
+ * Convenience function to create and initialize context (qd_management_context_t)
+ */
+static qd_management_context_t* qd_management_context(qd_message_t         *msg,
+                                                      qd_composed_field_t  *field,
+                                                      qdr_query_t          *query,
+                                                      qd_field_iterator_t  *to,
+                                                      qd_dispatch_t        *qd,
+                                                      int                  count)
+{
+    qd_management_context_t *ctx = new_qd_management_context_t();
+    ctx->count = count;
+    ctx->field = field;
+    ctx->msg   = msg;
+    if (query)
+        ctx->query = query;
+    else
+        ctx->query = 0;
+    ctx->current_count = 0;
+    ctx->qd = qd;
+    ctx->to = to;
+
+    return ctx;
+}
+
+static void qd_compose_send(qd_management_context_t *ctx)
+{
+    qd_compose_end_list(ctx->field);
+    qd_compose_end_map(ctx->field);
+    qd_message_compose_2(ctx->msg, ctx->field);
+    qd_router_send(ctx->qd, ctx->to, ctx->msg);
+
+    //We have come to the very end. Free the appropriate memory.
+    //ctx->field has already been freed in the call to qd_compose_end_list(ctx->field)
+    //ctx->query has also been already freed
+    qd_message_free(ctx->msg);
+    qd_field_iterator_free(ctx->to);
+    free_qd_management_context_t(ctx);
+}
+
+
+static void manage_response_handler (void *context, const qd_amqp_error_t *status, bool more)
+{
+    qd_management_context_t *ctx = (qd_management_context_t*) context;
+
+    //TODO - Check the status (qd_amqp_error_t) here first. If the status is anything other that 200, you need to send it back the message with the status.
+
+    if (!more || ctx->count == 0) {
+       // If Count is zero or there are no more rows to process or the status returned is something other than
+       // QD_AMQP_OK, we will close the list, send the message and
+       qd_compose_send(ctx);
+    }
+    else {
+        ctx->current_count++; // Increment how many you have at hand
+
+        if (ctx->count == ctx->current_count) //The count has matched, we are done, close the list and send out the message
+            qd_compose_send(ctx);
+        else
+            qdr_query_get_next(ctx->query);
+    }
+}
+
+static void core_agent_query_handler(qd_dispatch_t           *qd,
+                                     qd_router_entity_type_t entity_type,
+                                     qd_message_t            *msg,
+                                     int                     *count,
+                                     int                     *offset)
+{
+    qdr_core_t *core = qd_router_core(qd);
+
+    // Create a new message
+    qd_message_t *message = qd_message();
+    qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID);
+    // Grab the reply_to field from the incoming message. This is the address we will send the response to.
+    qd_field_iterator_t *reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
+
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+    qd_compose_start_list(field);
+    qd_compose_insert_null(field);                           // message-id
+    qd_compose_insert_null(field);                           // user-id
+    qd_compose_insert_string_iterator(field, reply_to);       // to
+    qd_compose_insert_null(field);                           // subject
+    qd_compose_insert_null(field);
+    qd_compose_insert_typed_iterator(field, correlation_id);
+    qd_compose_end_list(field);
+
+
+    // Get the attributeNames
+    qd_parsed_field_t *attribute_names_parsed_field = 0;
+
+    qd_parsed_field_t *body = qd_parse(qd_message_field_iterator(msg, QD_FIELD_BODY));
+
+    if (body != 0 && qd_parse_is_map(body))
+        attribute_names_parsed_field = qd_parse_value_by_key(body, attribute_names_key);
+
+    //
+    // Insert application property map with statusDescription of OK and status code of 200
+    //
+    field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+    qd_compose_start_map(field);
+
+    // Insert {'statusDescription': 'OK'}
+    qd_compose_insert_string(field, status_description);
+    qd_compose_insert_string(field, QD_AMQP_OK.description);
+
+    // Insert {'statusCode': '200'}
+    qd_compose_insert_string(field, status_code);
+    qd_compose_insert_uint(field, QD_AMQP_OK.status);
+
+    qd_compose_end_map(field);
+
+    //
+    // Add Body
+    //
+    field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
+
+    // Start a map in the body
+    qd_compose_start_map(field);
+
+    qd_compose_insert_string(field, attribute_names_key); //add a "attributeNames" key
+
+    // Set the callback function.
+    qdr_manage_handler(core, manage_response_handler);
+
+    qd_management_context_t *ctx = qd_management_context(message, field, 0, reply_to, qd, (*count));
+
+    ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field);
+
+    //Add the attribute names
+    qdr_query_add_attribute_names(ctx->query);
+
+    qd_compose_insert_string(field, results); //add a "results" key
+    qd_compose_start_list(field); //start the list for results
+
+    qdr_query_get_first(ctx->query, (*offset));
+}
+
+static void core_agent_create_handler()
+{
+
+}
+
+static void core_agent_read_handler()
+{
+
+}
+
+static void core_agent_update_handler()
+{
+
+}
+
+static void core_agent_delete_handler()
+{
+
+}
+
+/**
+ * Checks the content of the message to see if this can be handled by this agent.
+ */
+static bool can_handle_request(qd_field_iterator_t *props,
+                               qd_router_entity_type_t *entity_type,
+                               qd_router_operation_type_t *operation_type,
+                               int *count,
+                               int *offset)
+{
+    qd_parsed_field_t *fld = qd_parse(props);
+
+    // The must be a property field and that property field should be a AMQP map. This is true for QUERY but I need
+    // to check if it true for CREATE, UPDATE and DELETE
+    if (fld == 0 || !qd_parse_is_map(fld))
+        return false;
+
+    //
+    // Only certain entity types can be handled by this agent.
+    // 'entityType': 'org.apache.qpid.dispatch.router.address
+    // 'entityType': 'org.apache.qpid.dispatch.router.link'
+    // TODO - Add more entity types here. The above is not a complete list.
+
+    qd_parsed_field_t *parsed_field = qd_parse_value_by_key(fld, entity_type_key);
+
+    if (parsed_field == 0)
+        return false;
+
+    if (qd_field_iterator_equal(qd_parse_raw(parsed_field), address_entity_type))
+        (*entity_type) = QD_ROUTER_ADDRESS;
+    else if(qd_field_iterator_equal(qd_parse_raw(parsed_field), link_entity_type))
+        (*entity_type) = QD_ROUTER_LINK;
+    else
+        return false;
+
+
+    parsed_field = qd_parse_value_by_key(fld, operation_type_key);
+
+    if (parsed_field == 0)
+        return false;
+
+    if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_QUERY))
+        (*operation_type) = QD_ROUTER_OPERATION_QUERY;
+    else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_CREATE))
+        (*operation_type) = QD_ROUTER_OPERATION_CREATE;
+    else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_READ))
+        (*operation_type) = QD_ROUTER_OPERATION_READ;
+    else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_UPDATE))
+        (*operation_type) = QD_ROUTER_OPERATION_UPDATE;
+    else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_DELETE))
+        (*operation_type) = QD_ROUTER_OPERATION_DELETE;
+    else
+        // This is an unknown operation type. cannot be handled, return false.
+        return false;
+
+    // Obtain the count and offset.
+    parsed_field = qd_parse_value_by_key(fld, count_key);
+    if (parsed_field)
+        (*count) = qd_parse_as_int(parsed_field);
+    else
+        (*count) = -1;
+
+    parsed_field = qd_parse_value_by_key(fld, offset_key);
+    if (parsed_field)
+        (*offset) = qd_parse_as_int(parsed_field);
+    else
+        (*offset) = 0;
+
+    qd_parse_free(parsed_field);
+
+    return true;
+}
+
+/**
+ *
+ * Handler for the management agent.
+ *
+ */
+void management_agent_handler(void *context, qd_message_t *msg, int link_id)
+{
+    qd_dispatch_t *qd = (qd_dispatch_t*) context;
+    qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
+
+    qd_router_entity_type_t    entity_type = 0;
+    qd_router_operation_type_t operation_type = 0;
+
+    int32_t count = 0;
+    int32_t offset = 0;
+
+    if (can_handle_request(iter, &entity_type, &operation_type, &count, &offset)) {
+        switch (operation_type) {
+            case QD_ROUTER_OPERATION_QUERY:
+                core_agent_query_handler(qd, entity_type, msg, &count, &offset);
+                break;
+            case QD_ROUTER_OPERATION_CREATE:
+                core_agent_create_handler();
+                break;
+            case QD_ROUTER_OPERATION_READ:
+                core_agent_read_handler();
+                break;
+            case QD_ROUTER_OPERATION_UPDATE:
+                core_agent_update_handler();
+                break;
+            case QD_ROUTER_OPERATION_DELETE:
+                core_agent_delete_handler();
+                break;
+       }
+    }
+    else
+        qd_router_send2(qd, MANAGEMENT_INTERNAL, msg); //the C management agent is not going to handle this request. Forward it off to Python.
+    // TODO - This is wrong. Need to find out how I can forward off the message to $management_internal so it can be handled by Python.
+
+    qd_field_iterator_free(iter);
+
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_core/management_agent_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent_private.h b/src/router_core/management_agent_private.h
new file mode 100644
index 0000000..c321ca8
--- /dev/null
+++ b/src/router_core/management_agent_private.h
@@ -0,0 +1,25 @@
+#ifndef qd_router_core_agent_private
+#define qd_router_core_agent_private 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+void management_agent_handler(void *context, qd_message_t *msg, int link_id);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 6779817..8057f2b 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -27,10 +27,12 @@
 #include "entity_cache.h"
 #include "router_private.h"
 #include "waypoint_private.h"
+#include "router_core/management_agent_private.h"
 
 const char *QD_ROUTER_NODE_TYPE = "router.node";
 const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
 const char *QD_ROUTER_LINK_TYPE = "router.link";
+const char *CORE_AGENT_ADDRESS = "$management";
 
 static char *router_role    = "inter-router";
 static char *on_demand_role = "on-demand";
@@ -1870,6 +1872,10 @@ void qd_router_setup_late(qd_dispatch_t *qd)
     qd_router_python_setup(qd->router);
     qd->router->router_core = qdr_core(qd);
     qd_timer_schedule(qd->router->timer, 1000);
+
+    //Register the C management agent
+    qd_router_register_address(qd, CORE_AGENT_ADDRESS, management_agent_handler, (void *) qd, QD_SEMANTICS_DEFAULT, true, 0/*forwarder*/);
+    qd_router_register_address(qd, CORE_AGENT_ADDRESS, management_agent_handler, (void *) qd, QD_SEMANTICS_DEFAULT, false, 0/*forwarder*/);
 }
 
 void qd_router_free(qd_router_t *router)
@@ -1991,12 +1997,14 @@ void qd_router_send(qd_dispatch_t       *qd,
                     qd_field_iterator_t *address,
                     qd_message_t        *msg)
 {
+
     qd_router_t  *router = qd->router;
     qd_address_t *addr;
 
     qd_address_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
     sys_mutex_lock(router->lock);
     qd_hash_retrieve(router->addr_hash, address, (void*) &addr);
+
     if (addr) {
         //
         // Forward to all of the local links receiving this address.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 623a587..96703bc 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1403,6 +1403,7 @@ bool qd_user_fd_is_writeable(qd_user_fd_t *ufd)
 void qd_server_timer_pending_LH(qd_timer_t *timer)
 {
     DEQ_INSERT_TAIL(timer->server->pending_timers, timer);
+    qdpn_driver_wakeup(timer->server->driver);
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org