You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/06/26 22:12:07 UTC
svn commit: r1497070 - in /qpid/trunk/qpid/extras/dispatch:
include/qpid/dispatch/compose.h include/qpid/dispatch/router.h src/agent.c
src/compose.c src/message.c src/message_private.h src/router_node.c
Author: tross
Date: Wed Jun 26 20:12:07 2013
New Revision: 1497070
URL: http://svn.apache.org/r1497070
Log:
NO-JIRA - Management agent in the container now responds to "get" requests.
Modified:
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
qpid/trunk/qpid/extras/dispatch/src/agent.c
qpid/trunk/qpid/extras/dispatch/src/compose.c
qpid/trunk/qpid/extras/dispatch/src/message.c
qpid/trunk/qpid/extras/dispatch/src/message_private.h
qpid/trunk/qpid/extras/dispatch/src/router_node.c
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h Wed Jun 26 20:12:07 2013
@@ -20,6 +20,7 @@
*/
#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/iterator.h>
typedef struct dx_composed_field_t dx_composed_field_t;
@@ -144,7 +145,7 @@ void dx_compose_insert_timestamp(dx_comp
* @param field A field created by dx_compose.
* @param value The pointer to the first octet in the UUID to be inserted.
*/
-void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value);
+void dx_compose_insert_uuid(dx_composed_field_t *field, const uint8_t *value);
/**
* Insert a binary blob into the field.
@@ -173,6 +174,14 @@ void dx_compose_insert_binary_buffers(dx
void dx_compose_insert_string(dx_composed_field_t *field, const char *value);
/**
+ * Insert a utf8-encoded string into the field from an iterator
+ *
+ * @param field A field created by dx_compose.
+ * @param value A pointer to a null-terminated string.
+ */
+void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter);
+
+/**
* Insert a symbol into the field.
*
* @param field A field created by dx_compose.
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Wed Jun 26 20:12:07 2013
@@ -21,6 +21,7 @@
#include <qpid/dispatch/dispatch.h>
#include <qpid/dispatch/message.h>
+#include <qpid/dispatch/iterator.h>
#include <stdbool.h>
typedef struct dx_address_t dx_address_t;
@@ -38,9 +39,9 @@ dx_address_t *dx_router_register_address
void dx_router_unregister_address(dx_address_t *address);
-void dx_router_send(dx_dispatch_t *dx,
- const char *address,
- dx_message_t *msg);
+void dx_router_send(dx_dispatch_t *dx,
+ dx_field_iterator_t *address,
+ dx_message_t *msg);
#endif
Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/agent.c Wed Jun 26 20:12:07 2013
@@ -29,11 +29,12 @@
#include <qpid/dispatch/timer.h>
#include <qpid/dispatch/router.h>
#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/compose.h>
#include <string.h>
#include <stdio.h>
struct dx_agent_t {
- dx_server_t *server;
+ dx_dispatch_t *dx;
hash_t *class_hash;
dx_message_list_t in_fifo;
dx_message_list_t out_fifo;
@@ -52,15 +53,15 @@ struct dx_agent_class_t {
typedef struct {
- dx_agent_t *agent;
- dx_message_t *response_msg;
+ dx_agent_t *agent;
+ dx_composed_field_t *response;
} dx_agent_request_t;
static char *log_module = "AGENT";
-static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map)
+static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map, dx_field_iterator_t *reply_to)
{
dx_field_iterator_t *cls = dx_field_map_by_key(map, "class");
if (cls == 0)
@@ -74,11 +75,73 @@ static void dx_agent_process_get(dx_agen
dx_log(log_module, LOG_TRACE, "Received GET request for class: %s", cls_record->fqname);
+ //
+ // Compose the header
+ //
+ dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
+ dx_compose_start_list(field);
+ dx_compose_insert_bool(field, 0); // durable
+ dx_compose_end_list(field);
+
+ //
+ // Compose the Properties
+ //
+ field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
+ dx_compose_start_list(field);
+ dx_compose_insert_null(field); // message-id
+ dx_compose_insert_null(field); // user-id
+ dx_compose_insert_string_iterator(field, reply_to); // to
+ dx_compose_insert_null(field); // subject
+ dx_compose_insert_null(field); // reply-to
+ dx_compose_insert_string(field, "1"); // correlation-id // TODO - fix
+ dx_compose_end_list(field);
+
+ //
+ // Compose the Application Properties
+ //
+ field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+ dx_compose_start_map(field);
+ dx_compose_insert_string(field, "operation");
+ dx_compose_insert_string(field, "get");
+
+ dx_compose_insert_string(field, "status-code");
+ dx_compose_insert_uint(field, 200);
+
+ dx_compose_insert_string(field, "status-descriptor");
+ dx_compose_insert_string(field, "OK");
+ dx_compose_end_map(field);
+
+ //
+ // Open the Body (AMQP Value) to be filled in by the handler.
+ //
+ field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+ dx_compose_start_list(field);
+ dx_compose_start_map(field);
+
+ //
+ // The request record is allocated locally because the entire processing of the request
+ // will be done synchronously.
+ //
dx_agent_request_t request;
- request.agent = agent;
- request.response_msg = 0;
+ request.agent = agent;
+ request.response = field;
cls_record->query_handler(cls_record->context, 0, &request);
+
+ //
+ // The response is complete, close the list.
+ //
+ dx_compose_end_list(field);
+
+ //
+ // Create a message and send it.
+ //
+ dx_message_t *msg = dx_allocate_message();
+ dx_message_compose_2(msg, field);
+ dx_router_send(agent->dx, reply_to, msg);
+
+ dx_free_message(msg);
+ dx_compose_free(field);
}
@@ -98,6 +161,13 @@ static void dx_agent_process_request(dx_
return;
//
+ // Get an iterator for the reply-to. Exit if not found.
+ //
+ dx_field_iterator_t *reply_to = dx_message_field_iterator(msg, DX_FIELD_REPLY_TO);
+ if (reply_to == 0)
+ return;
+
+ //
// Try to get a map-view of the body. Exit if the body is not a map-value.
//
dx_field_map_t *map = dx_field_map(body, 1);
@@ -121,15 +191,16 @@ static void dx_agent_process_request(dx_
//
dx_field_iterator_t *opcode_string = dx_field_raw(opcode);
if (dx_field_iterator_equal(opcode_string, (unsigned char*) "get"))
- dx_agent_process_get(agent, map);
+ dx_agent_process_get(agent, map, reply_to);
dx_field_iterator_free(opcode_string);
dx_field_map_free(map);
dx_field_iterator_free(body);
+ dx_field_iterator_free(reply_to);
}
-static void dx_agent_timer_handler(void *context)
+static void dx_agent_deferred_handler(void *context)
{
dx_agent_t *agent = (dx_agent_t*) context;
dx_message_t *msg;
@@ -165,12 +236,12 @@ static void dx_agent_rx_handler(void *co
dx_agent_t *dx_agent(dx_dispatch_t *dx)
{
dx_agent_t *agent = NEW(dx_agent_t);
- agent->server = dx->server;
+ agent->dx = dx;
agent->class_hash = hash(6, 10, 1);
DEQ_INIT(agent->in_fifo);
DEQ_INIT(agent->out_fifo);
agent->lock = sys_mutex();
- agent->timer = dx_timer(dx, dx_agent_timer_handler, agent);
+ agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent);
agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
return agent;
@@ -225,41 +296,66 @@ dx_agent_class_t *dx_agent_register_even
void dx_agent_value_string(void *correlator, const char *key, const char *value)
{
+ dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+ dx_compose_insert_string(request->response, key);
+ dx_compose_insert_string(request->response, value);
}
void dx_agent_value_uint(void *correlator, const char *key, uint64_t value)
{
+ dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+ dx_compose_insert_string(request->response, key);
+ dx_compose_insert_uint(request->response, value);
}
void dx_agent_value_null(void *correlator, const char *key)
{
+ dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+ dx_compose_insert_string(request->response, key);
+ dx_compose_insert_null(request->response);
}
void dx_agent_value_boolean(void *correlator, const char *key, bool value)
{
+ dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+ dx_compose_insert_string(request->response, key);
+ dx_compose_insert_bool(request->response, value);
}
void dx_agent_value_binary(void *correlator, const char *key, const uint8_t *value, size_t len)
{
+ dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+ dx_compose_insert_string(request->response, key);
+ dx_compose_insert_binary(request->response, value, len);
}
void dx_agent_value_uuid(void *correlator, const char *key, const uint8_t *value)
{
+ dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+ dx_compose_insert_string(request->response, key);
+ dx_compose_insert_uuid(request->response, value);
}
void dx_agent_value_timestamp(void *correlator, const char *key, uint64_t value)
{
+ dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+ dx_compose_insert_string(request->response, key);
+ dx_compose_insert_timestamp(request->response, value);
}
void dx_agent_value_complete(void *correlator, bool more)
{
+ dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+ dx_compose_end_map(request->response);
+ if (more)
+ dx_compose_start_map(request->response);
}
Modified: qpid/trunk/qpid/extras/dispatch/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/compose.c?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/compose.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/compose.c Wed Jun 26 20:12:07 2013
@@ -190,6 +190,16 @@ static void dx_compose_end_composite(dx_
dx_overwrite_32(&comp->count_location, comp->count);
DEQ_REMOVE_HEAD(field->fieldStack);
+
+ //
+ // If there is an enclosing composite, update its length and count
+ //
+ dx_composite_t *enclosing = DEQ_HEAD(field->fieldStack);
+ if (enclosing) {
+ enclosing->length += 4 + comp->length;
+ enclosing->count++;
+ }
+
free_dx_composite_t(comp);
}
@@ -336,10 +346,10 @@ void dx_compose_insert_timestamp(dx_comp
}
-void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value)
+void dx_compose_insert_uuid(dx_composed_field_t *field, const uint8_t *value)
{
dx_insert_8(field, 0x98); // uuid
- dx_insert(field, (const uint8_t*) value, 16);
+ dx_insert(field, value, 16);
bump_count(field);
}
@@ -410,6 +420,34 @@ void dx_compose_insert_string(dx_compose
}
+void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter)
+{
+ uint32_t len = 0;
+
+ while (!dx_field_iterator_end(iter)) {
+ dx_field_iterator_octet(iter);
+ len++;
+ }
+
+ dx_field_iterator_reset(iter);
+
+ if (len < 256) {
+ dx_insert_8(field, 0xa1); // str8-utf8
+ dx_insert_8(field, (uint8_t) len);
+ } else {
+ dx_insert_8(field, 0xb1); // str32-utf8
+ dx_insert_32(field, len);
+ }
+
+ while (!dx_field_iterator_end(iter)) {
+ uint8_t octet = dx_field_iterator_octet(iter);
+ dx_insert_8(field, octet);
+ }
+
+ bump_count(field);
+}
+
+
void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value)
{
uint32_t len = strlen(value);
Modified: qpid/trunk/qpid/extras/dispatch/src/message.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message.c?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message.c Wed Jun 26 20:12:07 2013
@@ -287,6 +287,36 @@ static dx_field_location_t *dx_message_f
}
break;
+ case DX_FIELD_REPLY_TO:
+ while (1) {
+ if (content->field_reply_to.parsed)
+ return &content->field_reply_to;
+
+ if (content->section_message_properties.parsed == 0)
+ break;
+
+ dx_buffer_t *buffer = content->section_message_properties.buffer;
+ unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset;
+
+ int count = start_list(&cursor, &buffer);
+ int result;
+
+ if (count < 3)
+ break;
+
+ result = traverse_field(&cursor, &buffer, 0); // message_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // user_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // to
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // subject
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, &content->field_reply_to); // reply_to
+ if (!result) return 0;
+ }
+ break;
+
case DX_FIELD_BODY:
if (content->section_body.parsed)
return &content->section_body;
@@ -711,7 +741,7 @@ void dx_message_compose_1(dx_message_t *
field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
dx_compose_start_list(field);
- dx_compose_insert_null(field); // compose-id
+ dx_compose_insert_null(field); // message-id
dx_compose_insert_null(field); // user-id
dx_compose_insert_string(field, to); // to
//dx_compose_insert_null(field); // subject
@@ -738,3 +768,13 @@ void dx_message_compose_1(dx_message_t *
dx_compose_free(field);
}
+
+void dx_message_compose_2(dx_message_t *msg, dx_composed_field_t *field)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_buffer_list_t *field_buffers = dx_compose_buffers(field);
+
+ content->buffers = *field_buffers;
+ DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
+}
+
Modified: qpid/trunk/qpid/extras/dispatch/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message_private.h?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message_private.h Wed Jun 26 20:12:07 2013
@@ -73,6 +73,7 @@ typedef struct {
dx_field_location_t section_footer; // The footer
dx_field_location_t field_user_id; // The string value of the user-id
dx_field_location_t field_to; // The string value of the to field
+ dx_field_location_t field_reply_to; // The string value of the reply_to field
dx_field_location_t body; // The body of the message
dx_buffer_t *parse_buffer;
unsigned char *parse_cursor;
Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Wed Jun 26 20:12:07 2013
@@ -573,9 +573,25 @@ void dx_router_unregister_address(dx_add
}
-void dx_router_send(dx_dispatch_t *dx,
- const char *address,
- dx_message_t *msg)
+void dx_router_send(dx_dispatch_t *dx,
+ dx_field_iterator_t *address,
+ dx_message_t *msg)
{
+ dx_router_t *router = dx->router;
+ dx_address_t *addr;
+
+ dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, address, (void*) &addr);
+ if (addr) {
+ if (addr->rlink) {
+ pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link);
+ dx_message_t *copy = dx_message_copy(msg);
+ DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
+ pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
+ dx_link_activate(addr->rlink->link);
+ }
+ }
+ sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org