You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/06/26 22:12:07 UTC

svn commit: r1497070 - in /qpid/trunk/qpid/extras/dispatch: include/qpid/dispatch/compose.h include/qpid/dispatch/router.h src/agent.c src/compose.c src/message.c src/message_private.h src/router_node.c

Author: tross
Date: Wed Jun 26 20:12:07 2013
New Revision: 1497070

URL: http://svn.apache.org/r1497070
Log:
NO-JIRA - Management agent in the container now responds to "get" requests.

Modified:
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
    qpid/trunk/qpid/extras/dispatch/src/agent.c
    qpid/trunk/qpid/extras/dispatch/src/compose.c
    qpid/trunk/qpid/extras/dispatch/src/message.c
    qpid/trunk/qpid/extras/dispatch/src/message_private.h
    qpid/trunk/qpid/extras/dispatch/src/router_node.c

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h Wed Jun 26 20:12:07 2013
@@ -20,6 +20,7 @@
  */
 
 #include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/iterator.h>
 
 typedef struct dx_composed_field_t dx_composed_field_t;
 
@@ -144,7 +145,7 @@ void dx_compose_insert_timestamp(dx_comp
  * @param field A field created by dx_compose.
  * @param value The pointer to the first octet in the UUID to be inserted.
  */
-void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value);
+void dx_compose_insert_uuid(dx_composed_field_t *field, const uint8_t *value);
 
 /**
  * Insert a binary blob into the field.
@@ -173,6 +174,14 @@ void dx_compose_insert_binary_buffers(dx
 void dx_compose_insert_string(dx_composed_field_t *field, const char *value);
 
 /**
+ * Insert a utf8-encoded string into the field from an iterator
+ *
+ * @param field A field created by dx_compose.
+ * @param value A pointer to a null-terminated string.
+ */
+void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter);
+
+/**
  * Insert a symbol into the field.
  *
  * @param field A field created by dx_compose.

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Wed Jun 26 20:12:07 2013
@@ -21,6 +21,7 @@
 
 #include <qpid/dispatch/dispatch.h>
 #include <qpid/dispatch/message.h>
+#include <qpid/dispatch/iterator.h>
 #include <stdbool.h>
 
 typedef struct dx_address_t dx_address_t;
@@ -38,9 +39,9 @@ dx_address_t *dx_router_register_address
 void dx_router_unregister_address(dx_address_t *address);
 
 
-void dx_router_send(dx_dispatch_t *dx,
-                    const char    *address,
-                    dx_message_t  *msg);
+void dx_router_send(dx_dispatch_t       *dx,
+                    dx_field_iterator_t *address,
+                    dx_message_t        *msg);
 
 
 #endif

Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/agent.c Wed Jun 26 20:12:07 2013
@@ -29,11 +29,12 @@
 #include <qpid/dispatch/timer.h>
 #include <qpid/dispatch/router.h>
 #include <qpid/dispatch/log.h>
+#include <qpid/dispatch/compose.h>
 #include <string.h>
 #include <stdio.h>
 
 struct dx_agent_t {
-    dx_server_t       *server;
+    dx_dispatch_t     *dx;
     hash_t            *class_hash;
     dx_message_list_t  in_fifo;
     dx_message_list_t  out_fifo;
@@ -52,15 +53,15 @@ struct dx_agent_class_t {
 
 
 typedef struct {
-    dx_agent_t   *agent;
-    dx_message_t *response_msg;
+    dx_agent_t          *agent;
+    dx_composed_field_t *response;
 } dx_agent_request_t;
 
 
 static char *log_module = "AGENT";
 
 
-static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map)
+static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map, dx_field_iterator_t *reply_to)
 {
     dx_field_iterator_t *cls = dx_field_map_by_key(map, "class");
     if (cls == 0)
@@ -74,11 +75,73 @@ static void dx_agent_process_get(dx_agen
 
     dx_log(log_module, LOG_TRACE, "Received GET request for class: %s", cls_record->fqname);
 
+    //
+    // Compose the header
+    //
+    dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
+    dx_compose_start_list(field);
+    dx_compose_insert_bool(field, 0);     // durable
+    dx_compose_end_list(field);
+
+    //
+    // Compose the Properties
+    //
+    field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
+    dx_compose_start_list(field);
+    dx_compose_insert_null(field);                       // message-id
+    dx_compose_insert_null(field);                       // user-id
+    dx_compose_insert_string_iterator(field, reply_to);  // to
+    dx_compose_insert_null(field);                       // subject
+    dx_compose_insert_null(field);                       // reply-to
+    dx_compose_insert_string(field, "1");                // correlation-id   // TODO - fix
+    dx_compose_end_list(field);
+
+    //
+    // Compose the Application Properties
+    //
+    field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+    dx_compose_start_map(field);
+    dx_compose_insert_string(field, "operation");
+    dx_compose_insert_string(field, "get");
+
+    dx_compose_insert_string(field, "status-code");
+    dx_compose_insert_uint(field, 200);
+
+    dx_compose_insert_string(field, "status-descriptor");
+    dx_compose_insert_string(field, "OK");
+    dx_compose_end_map(field);
+
+    //
+    // Open the Body (AMQP Value) to be filled in by the handler.
+    //
+    field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+    dx_compose_start_list(field);
+    dx_compose_start_map(field);
+
+    //
+    // The request record is allocated locally because the entire processing of the request
+    // will be done synchronously.
+    //
     dx_agent_request_t request;
-    request.agent        = agent;
-    request.response_msg = 0;
+    request.agent    = agent;
+    request.response = field;
 
     cls_record->query_handler(cls_record->context, 0, &request);
+
+    //
+    // The response is complete, close the list.
+    //
+    dx_compose_end_list(field);
+
+    //
+    // Create a message and send it.
+    //
+    dx_message_t *msg = dx_allocate_message();
+    dx_message_compose_2(msg, field);
+    dx_router_send(agent->dx, reply_to, msg);
+
+    dx_free_message(msg);
+    dx_compose_free(field);
 }
 
 
@@ -98,6 +161,13 @@ static void dx_agent_process_request(dx_
         return;
 
     //
+    // Get an iterator for the reply-to.  Exit if not found.
+    //
+    dx_field_iterator_t *reply_to = dx_message_field_iterator(msg, DX_FIELD_REPLY_TO);
+    if (reply_to == 0)
+        return;
+
+    //
     // Try to get a map-view of the body.  Exit if the body is not a map-value.
     //
     dx_field_map_t *map = dx_field_map(body, 1);
@@ -121,15 +191,16 @@ static void dx_agent_process_request(dx_
     //
     dx_field_iterator_t *opcode_string = dx_field_raw(opcode);
     if (dx_field_iterator_equal(opcode_string, (unsigned char*) "get"))
-        dx_agent_process_get(agent, map);
+        dx_agent_process_get(agent, map, reply_to);
 
     dx_field_iterator_free(opcode_string);
     dx_field_map_free(map);
     dx_field_iterator_free(body);
+    dx_field_iterator_free(reply_to);
 }
 
 
-static void dx_agent_timer_handler(void *context)
+static void dx_agent_deferred_handler(void *context)
 {
     dx_agent_t   *agent = (dx_agent_t*) context;
     dx_message_t *msg;
@@ -165,12 +236,12 @@ static void dx_agent_rx_handler(void *co
 dx_agent_t *dx_agent(dx_dispatch_t *dx)
 {
     dx_agent_t *agent = NEW(dx_agent_t);
-    agent->server     = dx->server;
+    agent->dx         = dx;
     agent->class_hash = hash(6, 10, 1);
     DEQ_INIT(agent->in_fifo);
     DEQ_INIT(agent->out_fifo);
     agent->lock    = sys_mutex();
-    agent->timer   = dx_timer(dx, dx_agent_timer_handler, agent);
+    agent->timer   = dx_timer(dx, dx_agent_deferred_handler, agent);
     agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
 
     return agent;
@@ -225,41 +296,66 @@ dx_agent_class_t *dx_agent_register_even
 
 void dx_agent_value_string(void *correlator, const char *key, const char *value)
 {
+    dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+    dx_compose_insert_string(request->response, key);
+    dx_compose_insert_string(request->response, value);
 }
 
 
 void dx_agent_value_uint(void *correlator, const char *key, uint64_t value)
 {
+    dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+    dx_compose_insert_string(request->response, key);
+    dx_compose_insert_uint(request->response, value);
 }
 
 
 void dx_agent_value_null(void *correlator, const char *key)
 {
+    dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+    dx_compose_insert_string(request->response, key);
+    dx_compose_insert_null(request->response);
 }
 
 
 void dx_agent_value_boolean(void *correlator, const char *key, bool value)
 {
+    dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+    dx_compose_insert_string(request->response, key);
+    dx_compose_insert_bool(request->response, value);
 }
 
 
 void dx_agent_value_binary(void *correlator, const char *key, const uint8_t *value, size_t len)
 {
+    dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+    dx_compose_insert_string(request->response, key);
+    dx_compose_insert_binary(request->response, value, len);
 }
 
 
 void dx_agent_value_uuid(void *correlator, const char *key, const uint8_t *value)
 {
+    dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+    dx_compose_insert_string(request->response, key);
+    dx_compose_insert_uuid(request->response, value);
 }
 
 
 void dx_agent_value_timestamp(void *correlator, const char *key, uint64_t value)
 {
+    dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+    dx_compose_insert_string(request->response, key);
+    dx_compose_insert_timestamp(request->response, value);
 }
 
 
 void dx_agent_value_complete(void *correlator, bool more)
 {
+    dx_agent_request_t *request = (dx_agent_request_t*) correlator;
+    dx_compose_end_map(request->response);
+    if (more)
+        dx_compose_start_map(request->response);
 }
 
 

Modified: qpid/trunk/qpid/extras/dispatch/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/compose.c?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/compose.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/compose.c Wed Jun 26 20:12:07 2013
@@ -190,6 +190,16 @@ static void dx_compose_end_composite(dx_
     dx_overwrite_32(&comp->count_location,  comp->count);
 
     DEQ_REMOVE_HEAD(field->fieldStack);
+
+    //
+    // If there is an enclosing composite, update its length and count
+    //
+    dx_composite_t *enclosing = DEQ_HEAD(field->fieldStack);
+    if (enclosing) {
+        enclosing->length += 4 + comp->length;
+        enclosing->count++;
+    }
+
     free_dx_composite_t(comp);
 }
 
@@ -336,10 +346,10 @@ void dx_compose_insert_timestamp(dx_comp
 }
 
 
-void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value)
+void dx_compose_insert_uuid(dx_composed_field_t *field, const uint8_t *value)
 {
     dx_insert_8(field, 0x98);  // uuid
-    dx_insert(field, (const uint8_t*) value, 16);
+    dx_insert(field, value, 16);
     bump_count(field);
 }
 
@@ -410,6 +420,34 @@ void dx_compose_insert_string(dx_compose
 }
 
 
+void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter)
+{
+    uint32_t len = 0;
+
+    while (!dx_field_iterator_end(iter)) {
+        dx_field_iterator_octet(iter);
+        len++;
+    }
+
+    dx_field_iterator_reset(iter);
+
+    if (len < 256) {
+        dx_insert_8(field, 0xa1);  // str8-utf8
+        dx_insert_8(field, (uint8_t) len);
+    } else {
+        dx_insert_8(field, 0xb1);  // str32-utf8
+        dx_insert_32(field, len);
+    }
+
+    while (!dx_field_iterator_end(iter)) {
+        uint8_t octet = dx_field_iterator_octet(iter);
+        dx_insert_8(field, octet);
+    }
+
+    bump_count(field);
+}
+
+
 void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value)
 {
     uint32_t len = strlen(value);

Modified: qpid/trunk/qpid/extras/dispatch/src/message.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message.c?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message.c Wed Jun 26 20:12:07 2013
@@ -287,6 +287,36 @@ static dx_field_location_t *dx_message_f
         }
         break;
 
+    case DX_FIELD_REPLY_TO:
+        while (1) {
+            if (content->field_reply_to.parsed)
+                return &content->field_reply_to;
+
+            if (content->section_message_properties.parsed == 0)
+                break;
+
+            dx_buffer_t   *buffer = content->section_message_properties.buffer;
+            unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset;
+
+            int count = start_list(&cursor, &buffer);
+            int result;
+
+            if (count < 3)
+                break;
+
+            result = traverse_field(&cursor, &buffer, 0); // message_id
+            if (!result) return 0;
+            result = traverse_field(&cursor, &buffer, 0); // user_id
+            if (!result) return 0;
+            result = traverse_field(&cursor, &buffer, 0); // to
+            if (!result) return 0;
+            result = traverse_field(&cursor, &buffer, 0); // subject
+            if (!result) return 0;
+            result = traverse_field(&cursor, &buffer, &content->field_reply_to); // reply_to
+            if (!result) return 0;
+        }
+        break;
+
     case DX_FIELD_BODY:
         if (content->section_body.parsed)
             return &content->section_body;
@@ -711,7 +741,7 @@ void dx_message_compose_1(dx_message_t *
 
     field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
     dx_compose_start_list(field);
-    dx_compose_insert_null(field);          // compose-id
+    dx_compose_insert_null(field);          // message-id
     dx_compose_insert_null(field);          // user-id
     dx_compose_insert_string(field, to);    // to
     //dx_compose_insert_null(field);          // subject
@@ -738,3 +768,13 @@ void dx_message_compose_1(dx_message_t *
     dx_compose_free(field);
 }
 
+
+void dx_message_compose_2(dx_message_t *msg, dx_composed_field_t *field)
+{
+    dx_message_content_t *content       = MSG_CONTENT(msg);
+    dx_buffer_list_t     *field_buffers = dx_compose_buffers(field);
+
+    content->buffers = *field_buffers;
+    DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
+}
+

Modified: qpid/trunk/qpid/extras/dispatch/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message_private.h?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message_private.h Wed Jun 26 20:12:07 2013
@@ -73,6 +73,7 @@ typedef struct {
     dx_field_location_t  section_footer;                  // The footer
     dx_field_location_t  field_user_id;                   // The string value of the user-id
     dx_field_location_t  field_to;                        // The string value of the to field
+    dx_field_location_t  field_reply_to;                  // The string value of the reply_to field
     dx_field_location_t  body;                            // The body of the message
     dx_buffer_t         *parse_buffer;
     unsigned char       *parse_cursor;

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1497070&r1=1497069&r2=1497070&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Wed Jun 26 20:12:07 2013
@@ -573,9 +573,25 @@ void dx_router_unregister_address(dx_add
 }
 
 
-void dx_router_send(dx_dispatch_t *dx,
-                    const char    *address,
-                    dx_message_t  *msg)
+void dx_router_send(dx_dispatch_t       *dx,
+                    dx_field_iterator_t *address,
+                    dx_message_t        *msg)
 {
+    dx_router_t  *router = dx->router;
+    dx_address_t *addr;
+
+    dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
+    sys_mutex_lock(router->lock);
+    hash_retrieve(router->out_hash, address, (void*) &addr);
+    if (addr) {
+        if (addr->rlink) {
+            pn_link_t    *pn_outlink = dx_link_pn(addr->rlink->link);
+            dx_message_t *copy       = dx_message_copy(msg);
+            DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
+            pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
+            dx_link_activate(addr->rlink->link);
+        }
+    }
+    sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org