You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/11/06 21:42:06 UTC
qpid-dispatch git commit: DISPATCH-179 - Initial coding to introduce
a C agent that will handle certain requests and forward the rest to the
Python Agent
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 b3b3d79df -> cf319fd21
DISPATCH-179 - Initial coding to introduce a C agent that will handle certain requests and forward the rest to the Python Agent
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/cf319fd2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/cf319fd2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/cf319fd2
Branch: refs/heads/tross-DISPATCH-179-1
Commit: cf319fd215e8d2f4e4b09d5333dbc75fc2a96d22
Parents: b3b3d79
Author: ganeshmurthy <gm...@redhat.com>
Authored: Fri Nov 6 15:28:20 2015 -0500
Committer: ganeshmurthy <gm...@redhat.com>
Committed: Fri Nov 6 15:28:20 2015 -0500
----------------------------------------------------------------------
.../qpid_dispatch_internal/management/config.py | 3 +-
src/CMakeLists.txt | 1 +
src/router_core/agent.c | 4 +-
src/router_core/agent_address.c | 1 +
src/router_core/management_agent.c | 349 +++++++++++++++++++
src/router_core/management_agent_private.h | 25 ++
src/router_node.c | 8 +
src/server.c | 1 +
8 files changed, 389 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/python/qpid_dispatch_internal/management/config.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py
index 73725c1..0ab9bbb 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -138,6 +138,7 @@ def configure_dispatch(dispatch, lib_handle, filename):
for l in config.by_type('log'):
configure(l)
modules.remove(l["module"])
+
# Add default entities for any log modules not configured.
for m in modules: agent.configure(attributes=dict(type="log", module=m))
@@ -145,7 +146,7 @@ def configure_dispatch(dispatch, lib_handle, filename):
configure(config.by_type('container')[0])
configure(config.by_type('router')[0])
qd.qd_dispatch_prepare(dispatch)
- agent.activate("$management")
+ #agent.activate("$management_internal")
qd.qd_router_setup_late(dispatch) # Actions requiring active management agent.
# Remaining configuration
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5168803..a55b73b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -71,6 +71,7 @@ set(qpid_dispatch_SOURCES
router_core/router_core.c
router_core/router_core_thread.c
router_core/route_tables.c
+ router_core/management_agent.c
router_delivery.c
router_node.c
router_forwarders.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 11f8fc2..9eabf6b 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -102,7 +102,7 @@ qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, qd_router_entity_
switch (query->entity_type) {
case QD_ROUTER_CONNECTION: break;
case QD_ROUTER_LINK: break;
- case QD_ROUTER_ADDRESS: qdra_address_set_columns(query, attribute_names);
+ case QD_ROUTER_ADDRESS: qdra_address_set_columns(query, attribute_names);break;
case QD_ROUTER_WAYPOINT: break;
case QD_ROUTER_EXCHANGE: break;
case QD_ROUTER_BINDING: break;
@@ -117,7 +117,7 @@ void qdr_query_add_attribute_names(qdr_query_t *query)
switch (query->entity_type) {
case QD_ROUTER_CONNECTION: break;
case QD_ROUTER_LINK: break;
- case QD_ROUTER_ADDRESS: qdra_address_emit_columns(query);
+ case QD_ROUTER_ADDRESS: qdra_address_emit_columns(query); break;
case QD_ROUTER_WAYPOINT: break;
case QD_ROUTER_EXCHANGE: break;
case QD_ROUTER_BINDING: break;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index 94a0659..88a60f7 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -111,6 +111,7 @@ static void qdr_manage_write_address_CT(qdr_query_t *query, qdr_address_t *addr)
qd_compose_insert_null(body);
break;
}
+ i++;
}
qd_compose_end_list(body);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
new file mode 100644
index 0000000..848ca66
--- /dev/null
+++ b/src/router_core/management_agent.c
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/router.h>
+#include <qpid/dispatch/router_core.h>
+#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/dispatch.h>
+#include "management_agent_private.h"
+#include "dispatch_private.h"
+#include "alloc.h"
+
+const char *entity_type_key = "entityType";
+const char *count_key = "count";
+const char *offset_key = "offset";
+
+const char *operation_type_key = "operation";
+const char *attribute_names_key = "attributeNames";
+
+const unsigned char *address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.address";
+const unsigned char *link_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.link";
+
+const char * const status_description = "statusDescription";
+const char * const correlation_id = "correlation-id";
+const char * const results = "results";
+const char * const status_code = "statusCode";
+
+const char * MANAGEMENT_INTERNAL = "$management_internal";
+
+//TODO - Move these to amqp.h
+const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY";
+const unsigned char *MANAGEMENT_CREATE = (unsigned char*) "CREATE";
+const unsigned char *MANAGEMENT_READ = (unsigned char*) "READ";
+const unsigned char *MANAGEMENT_UPDATE = (unsigned char*) "UPDATE";
+const unsigned char *MANAGEMENT_DELETE = (unsigned char*) "DELETE";
+
+
+typedef enum {
+ QD_ROUTER_OPERATION_QUERY,
+ QD_ROUTER_OPERATION_CREATE,
+ QD_ROUTER_OPERATION_READ,
+ QD_ROUTER_OPERATION_UPDATE,
+ QD_ROUTER_OPERATION_DELETE,
+} qd_router_operation_type_t;
+
+
+typedef struct qd_management_context_t {
+ qd_message_t *msg;
+ qd_composed_field_t *field;
+ qdr_query_t *query;
+ qd_dispatch_t *qd;
+ qd_field_iterator_t *to;
+ int count;
+ int current_count;
+} qd_management_context_t ;
+
+ALLOC_DECLARE(qd_management_context_t);
+ALLOC_DEFINE(qd_management_context_t);
+
+/**
+ * Convenience function to create and initialize context (qd_management_context_t)
+ */
+static qd_management_context_t* qd_management_context(qd_message_t *msg,
+ qd_composed_field_t *field,
+ qdr_query_t *query,
+ qd_field_iterator_t *to,
+ qd_dispatch_t *qd,
+ int count)
+{
+ qd_management_context_t *ctx = new_qd_management_context_t();
+ ctx->count = count;
+ ctx->field = field;
+ ctx->msg = msg;
+ if (query)
+ ctx->query = query;
+ else
+ ctx->query = 0;
+ ctx->current_count = 0;
+ ctx->qd = qd;
+ ctx->to = to;
+
+ return ctx;
+}
+
+static void qd_compose_send(qd_management_context_t *ctx)
+{
+ qd_compose_end_list(ctx->field);
+ qd_compose_end_map(ctx->field);
+ qd_message_compose_2(ctx->msg, ctx->field);
+ qd_router_send(ctx->qd, ctx->to, ctx->msg);
+
+ //We have come to the very end. Free the appropriate memory.
+ //ctx->field has already been freed in the call to qd_compose_end_list(ctx->field)
+ //ctx->query has also been already freed
+ qd_message_free(ctx->msg);
+ qd_field_iterator_free(ctx->to);
+ free_qd_management_context_t(ctx);
+}
+
+
+static void manage_response_handler (void *context, const qd_amqp_error_t *status, bool more)
+{
+ qd_management_context_t *ctx = (qd_management_context_t*) context;
+
+ //TODO - Check the status (qd_amqp_error_t) here first. If the status is anything other that 200, you need to send it back the message with the status.
+
+ if (!more || ctx->count == 0) {
+ // If Count is zero or there are no more rows to process or the status returned is something other than
+ // QD_AMQP_OK, we will close the list, send the message and
+ qd_compose_send(ctx);
+ }
+ else {
+ ctx->current_count++; // Increment how many you have at hand
+
+ if (ctx->count == ctx->current_count) //The count has matched, we are done, close the list and send out the message
+ qd_compose_send(ctx);
+ else
+ qdr_query_get_next(ctx->query);
+ }
+}
+
+static void core_agent_query_handler(qd_dispatch_t *qd,
+ qd_router_entity_type_t entity_type,
+ qd_message_t *msg,
+ int *count,
+ int *offset)
+{
+ qdr_core_t *core = qd_router_core(qd);
+
+ // Create a new message
+ qd_message_t *message = qd_message();
+ qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID);
+ // Grab the reply_to field from the incoming message. This is the address we will send the response to.
+ qd_field_iterator_t *reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
+
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+ qd_compose_start_list(field);
+ qd_compose_insert_null(field); // message-id
+ qd_compose_insert_null(field); // user-id
+ qd_compose_insert_string_iterator(field, reply_to); // to
+ qd_compose_insert_null(field); // subject
+ qd_compose_insert_null(field);
+ qd_compose_insert_typed_iterator(field, correlation_id);
+ qd_compose_end_list(field);
+
+
+ // Get the attributeNames
+ qd_parsed_field_t *attribute_names_parsed_field = 0;
+
+ qd_parsed_field_t *body = qd_parse(qd_message_field_iterator(msg, QD_FIELD_BODY));
+
+ if (body != 0 && qd_parse_is_map(body))
+ attribute_names_parsed_field = qd_parse_value_by_key(body, attribute_names_key);
+
+ //
+ // Insert application property map with statusDescription of OK and status code of 200
+ //
+ field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+ qd_compose_start_map(field);
+
+ // Insert {'statusDescription': 'OK'}
+ qd_compose_insert_string(field, status_description);
+ qd_compose_insert_string(field, QD_AMQP_OK.description);
+
+ // Insert {'statusCode': '200'}
+ qd_compose_insert_string(field, status_code);
+ qd_compose_insert_uint(field, QD_AMQP_OK.status);
+
+ qd_compose_end_map(field);
+
+ //
+ // Add Body
+ //
+ field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
+
+ // Start a map in the body
+ qd_compose_start_map(field);
+
+ qd_compose_insert_string(field, attribute_names_key); //add a "attributeNames" key
+
+ // Set the callback function.
+ qdr_manage_handler(core, manage_response_handler);
+
+ qd_management_context_t *ctx = qd_management_context(message, field, 0, reply_to, qd, (*count));
+
+ ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field);
+
+ //Add the attribute names
+ qdr_query_add_attribute_names(ctx->query);
+
+ qd_compose_insert_string(field, results); //add a "results" key
+ qd_compose_start_list(field); //start the list for results
+
+ qdr_query_get_first(ctx->query, (*offset));
+}
+
+static void core_agent_create_handler()
+{
+
+}
+
+static void core_agent_read_handler()
+{
+
+}
+
+static void core_agent_update_handler()
+{
+
+}
+
+static void core_agent_delete_handler()
+{
+
+}
+
+/**
+ * Checks the content of the message to see if this can be handled by this agent.
+ */
+static bool can_handle_request(qd_field_iterator_t *props,
+ qd_router_entity_type_t *entity_type,
+ qd_router_operation_type_t *operation_type,
+ int *count,
+ int *offset)
+{
+ qd_parsed_field_t *fld = qd_parse(props);
+
+ // The must be a property field and that property field should be a AMQP map. This is true for QUERY but I need
+ // to check if it true for CREATE, UPDATE and DELETE
+ if (fld == 0 || !qd_parse_is_map(fld))
+ return false;
+
+ //
+ // Only certain entity types can be handled by this agent.
+ // 'entityType': 'org.apache.qpid.dispatch.router.address
+ // 'entityType': 'org.apache.qpid.dispatch.router.link'
+ // TODO - Add more entity types here. The above is not a complete list.
+
+ qd_parsed_field_t *parsed_field = qd_parse_value_by_key(fld, entity_type_key);
+
+ if (parsed_field == 0)
+ return false;
+
+ if (qd_field_iterator_equal(qd_parse_raw(parsed_field), address_entity_type))
+ (*entity_type) = QD_ROUTER_ADDRESS;
+ else if(qd_field_iterator_equal(qd_parse_raw(parsed_field), link_entity_type))
+ (*entity_type) = QD_ROUTER_LINK;
+ else
+ return false;
+
+
+ parsed_field = qd_parse_value_by_key(fld, operation_type_key);
+
+ if (parsed_field == 0)
+ return false;
+
+ if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_QUERY))
+ (*operation_type) = QD_ROUTER_OPERATION_QUERY;
+ else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_CREATE))
+ (*operation_type) = QD_ROUTER_OPERATION_CREATE;
+ else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_READ))
+ (*operation_type) = QD_ROUTER_OPERATION_READ;
+ else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_UPDATE))
+ (*operation_type) = QD_ROUTER_OPERATION_UPDATE;
+ else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_DELETE))
+ (*operation_type) = QD_ROUTER_OPERATION_DELETE;
+ else
+ // This is an unknown operation type. cannot be handled, return false.
+ return false;
+
+ // Obtain the count and offset.
+ parsed_field = qd_parse_value_by_key(fld, count_key);
+ if (parsed_field)
+ (*count) = qd_parse_as_int(parsed_field);
+ else
+ (*count) = -1;
+
+ parsed_field = qd_parse_value_by_key(fld, offset_key);
+ if (parsed_field)
+ (*offset) = qd_parse_as_int(parsed_field);
+ else
+ (*offset) = 0;
+
+ qd_parse_free(parsed_field);
+
+ return true;
+}
+
+/**
+ *
+ * Handler for the management agent.
+ *
+ */
+void management_agent_handler(void *context, qd_message_t *msg, int link_id)
+{
+ qd_dispatch_t *qd = (qd_dispatch_t*) context;
+ qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
+
+ qd_router_entity_type_t entity_type = 0;
+ qd_router_operation_type_t operation_type = 0;
+
+ int32_t count = 0;
+ int32_t offset = 0;
+
+ if (can_handle_request(iter, &entity_type, &operation_type, &count, &offset)) {
+ switch (operation_type) {
+ case QD_ROUTER_OPERATION_QUERY:
+ core_agent_query_handler(qd, entity_type, msg, &count, &offset);
+ break;
+ case QD_ROUTER_OPERATION_CREATE:
+ core_agent_create_handler();
+ break;
+ case QD_ROUTER_OPERATION_READ:
+ core_agent_read_handler();
+ break;
+ case QD_ROUTER_OPERATION_UPDATE:
+ core_agent_update_handler();
+ break;
+ case QD_ROUTER_OPERATION_DELETE:
+ core_agent_delete_handler();
+ break;
+ }
+ }
+ else
+ qd_router_send2(qd, MANAGEMENT_INTERNAL, msg); //the C management agent is not going to handle this request. Forward it off to Python.
+ // TODO - This is wrong. Need to find out how I can forward off the message to $management_internal so it can be handled by Python.
+
+ qd_field_iterator_free(iter);
+
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_core/management_agent_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent_private.h b/src/router_core/management_agent_private.h
new file mode 100644
index 0000000..c321ca8
--- /dev/null
+++ b/src/router_core/management_agent_private.h
@@ -0,0 +1,25 @@
+#ifndef qd_router_core_agent_private
+#define qd_router_core_agent_private 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+void management_agent_handler(void *context, qd_message_t *msg, int link_id);
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 6779817..8057f2b 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -27,10 +27,12 @@
#include "entity_cache.h"
#include "router_private.h"
#include "waypoint_private.h"
+#include "router_core/management_agent_private.h"
const char *QD_ROUTER_NODE_TYPE = "router.node";
const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
const char *QD_ROUTER_LINK_TYPE = "router.link";
+const char *CORE_AGENT_ADDRESS = "$management";
static char *router_role = "inter-router";
static char *on_demand_role = "on-demand";
@@ -1870,6 +1872,10 @@ void qd_router_setup_late(qd_dispatch_t *qd)
qd_router_python_setup(qd->router);
qd->router->router_core = qdr_core(qd);
qd_timer_schedule(qd->router->timer, 1000);
+
+ //Register the C management agent
+ qd_router_register_address(qd, CORE_AGENT_ADDRESS, management_agent_handler, (void *) qd, QD_SEMANTICS_DEFAULT, true, 0/*forwarder*/);
+ qd_router_register_address(qd, CORE_AGENT_ADDRESS, management_agent_handler, (void *) qd, QD_SEMANTICS_DEFAULT, false, 0/*forwarder*/);
}
void qd_router_free(qd_router_t *router)
@@ -1991,12 +1997,14 @@ void qd_router_send(qd_dispatch_t *qd,
qd_field_iterator_t *address,
qd_message_t *msg)
{
+
qd_router_t *router = qd->router;
qd_address_t *addr;
qd_address_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
qd_hash_retrieve(router->addr_hash, address, (void*) &addr);
+
if (addr) {
//
// Forward to all of the local links receiving this address.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cf319fd2/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 623a587..96703bc 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1403,6 +1403,7 @@ bool qd_user_fd_is_writeable(qd_user_fd_t *ufd)
void qd_server_timer_pending_LH(qd_timer_t *timer)
{
DEQ_INSERT_TAIL(timer->server->pending_timers, timer);
+ qdpn_driver_wakeup(timer->server->driver);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org